diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 19f76e506..df04c396c 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -182,16 +182,6 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { } } - private String mapPoolId(ExecuteQuerySettings settings) { - String actualPoolId = settings.getResourcePool(); - - if (actualPoolId == null) { - return YdbQuery.ExecuteQueryRequest.getDefaultInstance().getPoolId(); - } - - return actualPoolId; - } - GrpcReadStream createGrpcStream( String query, YdbQuery.TransactionControl tx, Params prms, ExecuteQuerySettings settings ) { @@ -199,14 +189,19 @@ GrpcReadStream createGrpcStream( .setSessionId(sessionId) .setExecMode(mapExecMode(settings.getExecMode())) .setStatsMode(mapStatsMode(settings.getStatsMode())) + .setConcurrentResultSets(settings.isConcurrentResultSets()) .setQueryContent(YdbQuery.QueryContent.newBuilder() .setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1) .setText(query) .build() ) - .setPoolId(mapPoolId(settings)) .putAllParameters(prms.toPb()); + String resourcePool = settings.getResourcePool(); + if (resourcePool != null && !resourcePool.isEmpty()) { + requestBuilder.setPoolId(resourcePool); + } + if (tx != null) { requestBuilder.setTxControl(tx); } diff --git a/query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java b/query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java index b85be17a6..707618dc5 100644 --- a/query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java +++ b/query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java @@ -9,16 +9,14 @@ public class ExecuteQuerySettings extends BaseRequestSettings { private final QueryExecMode execMode; private final QueryStatsMode statsMode; - - /** - * Resource pool - */ + private final boolean concurrentResultSets; private final String resourcePool; private ExecuteQuerySettings(Builder builder) { super(builder); this.execMode = builder.execMode; this.statsMode = builder.statsMode; + this.concurrentResultSets = builder.concurrentResultSets; this.resourcePool = builder.resourcePool; } @@ -30,6 +28,14 @@ public QueryStatsMode getStatsMode() { return this.statsMode; } + public boolean isConcurrentResultSets() { + return this.concurrentResultSets; + } + + /** + * Get resource pool for query execution + * @return resource pool name + */ public String getResourcePool() { return this.resourcePool; } @@ -41,6 +47,7 @@ public static Builder newBuilder() { public static class Builder extends BaseBuilder { private QueryExecMode execMode = QueryExecMode.EXECUTE; private QueryStatsMode statsMode = QueryStatsMode.NONE; + private boolean concurrentResultSets = false; private String resourcePool = null; public Builder withExecMode(QueryExecMode mode) { @@ -53,12 +60,17 @@ public Builder withStatsMode(QueryStatsMode mode) { return this; } + public Builder withConcurrentResultSets(boolean value) { + this.concurrentResultSets = value; + return this; + } + /** * Set resource pool which query try to use. * If no pool specify or poolId is empty or poolId equals "default" - * the undeleted resource pool "default" wll be used + * the unremovable resource pool "default" will be used * - * @param poolId poolId in ydb + * @param poolId resource pool identifier * * @return builder */ diff --git a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java index 4c348da63..d3d434424 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java @@ -1,9 +1,13 @@ package tech.ydb.query.impl; import java.time.Duration; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.junit.AfterClass; @@ -315,6 +319,46 @@ public void updateMultipleTablesInOneTransaction() { } } + @Test + public void concurrentResultSetsTest() { + try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { + try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + String query = "" + + "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;\n" + + "SELECT 3 + 5;\n" + + "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id DESC;\n" + + "SELECT 3 + 1;\n"; + + // consistent read - all result sets are ordered + QueryStream qs1 = session.createQuery(query, TxMode.SNAPSHOT_RO, Params.empty(), + ExecuteQuerySettings.newBuilder().withConcurrentResultSets(false).build()); + + Deque ordered = new ArrayDeque<>(); + Result res1 = qs1.execute(part -> { + Long id = part.getResultSetIndex(); + if (!ordered.isEmpty()) { + Assert.assertTrue(id >= ordered.getLast()); + } + ordered.addLast(id); + }).join(); + Assert.assertTrue(res1.isSuccess()); + + // concurrent read - all result sets are unordered + QueryStream qs2 = session.createQuery(query, TxMode.SNAPSHOT_RO, Params.empty(), + ExecuteQuerySettings.newBuilder().withConcurrentResultSets(true).build()); + + Set unordered = new HashSet<>(); + Result res2 = qs2.execute(part -> { + unordered.add(part.getResultSetIndex()); + }).join(); + Assert.assertTrue(res2.isSuccess()); + + Assert.assertTrue(ordered.containsAll(unordered)); + Assert.assertTrue(unordered.containsAll(ordered)); + } + } + } + @Test public void interactiveTransaction() { try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {