Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,31 +182,26 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) {
}
}

private String mapPoolId(ExecuteQuerySettings settings) {
String actualPoolId = settings.getResourcePool();

if (actualPoolId == null) {
return YdbQuery.ExecuteQueryRequest.getDefaultInstance().getPoolId();
}

return actualPoolId;
}

GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
String query, YdbQuery.TransactionControl tx, Params prms, ExecuteQuerySettings settings
) {
YdbQuery.ExecuteQueryRequest.Builder requestBuilder = YdbQuery.ExecuteQueryRequest.newBuilder()
.setSessionId(sessionId)
.setExecMode(mapExecMode(settings.getExecMode()))
.setStatsMode(mapStatsMode(settings.getStatsMode()))
.setConcurrentResultSets(settings.isConcurrentResultSets())
.setQueryContent(YdbQuery.QueryContent.newBuilder()
.setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1)
.setText(query)
.build()
)
.setPoolId(mapPoolId(settings))
.putAllParameters(prms.toPb());

String resourcePool = settings.getResourcePool();
if (resourcePool != null && !resourcePool.isEmpty()) {
requestBuilder.setPoolId(resourcePool);
}

if (tx != null) {
requestBuilder.setTxControl(tx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@
public class ExecuteQuerySettings extends BaseRequestSettings {
private final QueryExecMode execMode;
private final QueryStatsMode statsMode;

/**
* Resource pool
*/
private final boolean concurrentResultSets;
private final String resourcePool;

private ExecuteQuerySettings(Builder builder) {
super(builder);
this.execMode = builder.execMode;
this.statsMode = builder.statsMode;
this.concurrentResultSets = builder.concurrentResultSets;
this.resourcePool = builder.resourcePool;
}

Expand All @@ -30,6 +28,14 @@ public QueryStatsMode getStatsMode() {
return this.statsMode;
}

public boolean isConcurrentResultSets() {
return this.concurrentResultSets;
}

/**
* Get resource pool for query execution
* @return resource pool name
*/
public String getResourcePool() {
return this.resourcePool;
}
Expand All @@ -41,6 +47,7 @@ public static Builder newBuilder() {
public static class Builder extends BaseBuilder<Builder> {
private QueryExecMode execMode = QueryExecMode.EXECUTE;
private QueryStatsMode statsMode = QueryStatsMode.NONE;
private boolean concurrentResultSets = false;
private String resourcePool = null;

public Builder withExecMode(QueryExecMode mode) {
Expand All @@ -53,12 +60,17 @@ public Builder withStatsMode(QueryStatsMode mode) {
return this;
}

public Builder withConcurrentResultSets(boolean value) {
this.concurrentResultSets = value;
return this;
}

/**
* Set resource pool which query try to use.
* If no pool specify or poolId is empty or poolId equals "default"
* the undeleted resource pool "default" wll be used
* the unremovable resource pool "default" will be used
*
* @param poolId poolId in ydb
* @param poolId resource pool identifier
*
* @return builder
*/
Expand Down
44 changes: 44 additions & 0 deletions query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package tech.ydb.query.impl;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.junit.AfterClass;
Expand Down Expand Up @@ -315,6 +319,46 @@ public void updateMultipleTablesInOneTransaction() {
}
}

@Test
public void concurrentResultSetsTest() {
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
String query = ""
+ "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;\n"
+ "SELECT 3 + 5;\n"
+ "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id DESC;\n"
+ "SELECT 3 + 1;\n";

// consistent read - all result sets are ordered
QueryStream qs1 = session.createQuery(query, TxMode.SNAPSHOT_RO, Params.empty(),
ExecuteQuerySettings.newBuilder().withConcurrentResultSets(false).build());

Deque<Long> ordered = new ArrayDeque<>();
Result<QueryInfo> res1 = qs1.execute(part -> {
Long id = part.getResultSetIndex();
if (!ordered.isEmpty()) {
Assert.assertTrue(id >= ordered.getLast());
}
ordered.addLast(id);
}).join();
Assert.assertTrue(res1.isSuccess());

// concurrent read - all result sets are unordered
QueryStream qs2 = session.createQuery(query, TxMode.SNAPSHOT_RO, Params.empty(),
ExecuteQuerySettings.newBuilder().withConcurrentResultSets(true).build());

Set<Long> unordered = new HashSet<>();
Result<QueryInfo> res2 = qs2.execute(part -> {
unordered.add(part.getResultSetIndex());
}).join();
Assert.assertTrue(res2.isSuccess());

Assert.assertTrue(ordered.containsAll(unordered));
Assert.assertTrue(unordered.containsAll(ordered));
}
}
}

@Test
public void interactiveTransaction() {
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
Expand Down