Skip to content

Commit 88678c6

Browse files
authored
Merge pull request #382 from ekuvardin/335.Support-pool-id
335.support pool
2 parents 9eb4f42 + 1ce80ac commit 88678c6

File tree

3 files changed

+266
-0
lines changed

3 files changed

+266
-0
lines changed

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,16 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) {
182182
}
183183
}
184184

185+
private String mapPoolId(ExecuteQuerySettings settings) {
186+
String actualPoolId = settings.getResourcePool();
187+
188+
if (actualPoolId == null) {
189+
return YdbQuery.ExecuteQueryRequest.getDefaultInstance().getPoolId();
190+
}
191+
192+
return actualPoolId;
193+
}
194+
185195
GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
186196
String query, YdbQuery.TransactionControl tx, Params prms, ExecuteQuerySettings settings
187197
) {
@@ -194,6 +204,7 @@ GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
194204
.setText(query)
195205
.build()
196206
)
207+
.setPoolId(mapPoolId(settings))
197208
.putAllParameters(prms.toPb());
198209

199210
if (tx != null) {

query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,16 @@ public class ExecuteQuerySettings extends BaseRequestSettings {
1010
private final QueryExecMode execMode;
1111
private final QueryStatsMode statsMode;
1212

13+
/**
14+
* Resource pool
15+
*/
16+
private final String resourcePool;
17+
1318
private ExecuteQuerySettings(Builder builder) {
1419
super(builder);
1520
this.execMode = builder.execMode;
1621
this.statsMode = builder.statsMode;
22+
this.resourcePool = builder.resourcePool;
1723
}
1824

1925
public QueryExecMode getExecMode() {
@@ -24,13 +30,18 @@ public QueryStatsMode getStatsMode() {
2430
return this.statsMode;
2531
}
2632

33+
public String getResourcePool() {
34+
return this.resourcePool;
35+
}
36+
2737
public static Builder newBuilder() {
2838
return new Builder();
2939
}
3040

3141
public static class Builder extends BaseBuilder<Builder> {
3242
private QueryExecMode execMode = QueryExecMode.EXECUTE;
3343
private QueryStatsMode statsMode = QueryStatsMode.NONE;
44+
private String resourcePool = null;
3445

3546
public Builder withExecMode(QueryExecMode mode) {
3647
this.execMode = mode;
@@ -42,6 +53,20 @@ public Builder withStatsMode(QueryStatsMode mode) {
4253
return this;
4354
}
4455

56+
/**
57+
* Set resource pool which query try to use.
58+
* If no pool specify or poolId is empty or poolId equals "default"
59+
* the undeleted resource pool "default" wll be used
60+
*
61+
* @param poolId poolId in ydb
62+
*
63+
* @return builder
64+
*/
65+
public Builder withResourcePool(String poolId) {
66+
this.resourcePool = poolId;
67+
return this;
68+
}
69+
4570
@Override
4671
public ExecuteQuerySettings build() {
4772
return new ExecuteQuerySettings(this);
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package tech.ydb.query.impl;
2+
3+
import org.junit.*;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import tech.ydb.common.transaction.TxMode;
8+
import tech.ydb.core.Result;
9+
import tech.ydb.query.QueryClient;
10+
import tech.ydb.query.QuerySession;
11+
import tech.ydb.query.result.QueryInfo;
12+
import tech.ydb.query.settings.ExecuteQuerySettings;
13+
import tech.ydb.query.settings.QueryExecMode;
14+
import tech.ydb.query.settings.QueryStatsMode;
15+
import tech.ydb.table.SessionRetryContext;
16+
import tech.ydb.table.description.TableDescription;
17+
import tech.ydb.table.impl.SimpleTableClient;
18+
import tech.ydb.table.query.Params;
19+
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
20+
import tech.ydb.table.values.PrimitiveType;
21+
import tech.ydb.test.junit4.GrpcTransportRule;
22+
23+
import java.time.Duration;
24+
25+
/**
26+
* Test on resource poll.
27+
* <p>
28+
* Take an account, that resource poll with name "default" exists every time and can't be deleted
29+
* Also when we specify pool with empty string "" it's equivalent to default pool
30+
* <p>
31+
* Test marked with @Ignore should be uncommented when resource pool disappeared from experimental feature
32+
* <p>
33+
* Until this to run test go to @see tech.ydb.test.integration.YdbEnvironment
34+
* {@link tech.ydb.test.integration.YdbEnvironment}
35+
* dockerFeatures = createParam("YDB_DOCKER_FEATURE_FLAGS", "enable_resource_pools");
36+
* By the way feature available with image ydbplatform/local-ydb:24.3.11.13";
37+
*
38+
* @author Evgeny Kuvardin
39+
*/
40+
public class QueryIntegrationResourcePoolTest {
41+
private final static Logger logger = LoggerFactory.getLogger(QueryIntegrationResourcePoolTest.class);
42+
private final static String TEST_TABLE = "query_resource_pool_service_test";
43+
private final static String TEST_RESOURCE_POOL = "test_pool";
44+
private final static String TEST_RESOURCE_POOL_WITH_DELETE = "test_pool_fot_delete";
45+
46+
47+
@ClassRule
48+
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
49+
50+
@BeforeClass
51+
public static void initSchema() {
52+
logger.info("Prepare database...");
53+
54+
String tablePath = ydbTransport.getDatabase() + "/" + TEST_TABLE;
55+
TableDescription tableDescription = TableDescription.newBuilder()
56+
.addNonnullColumn("id", PrimitiveType.Int32)
57+
.addNullableColumn("name", PrimitiveType.Text)
58+
.setPrimaryKey("id")
59+
.build();
60+
61+
SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(ydbTransport)).build();
62+
SessionRetryContext retryCtx = SessionRetryContext.create(client).build();
63+
Assert.assertTrue("Table should be created before tests",
64+
retryCtx.supplyStatus(session -> session.createTable(tablePath, tableDescription)).join().isSuccess());
65+
logger.info("Prepare database OK");
66+
67+
}
68+
69+
@AfterClass
70+
public static void dropAll() {
71+
logger.info("Clean database...");
72+
String tablePath = ydbTransport.getDatabase() + "/" + TEST_TABLE;
73+
74+
SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(ydbTransport)).build();
75+
SessionRetryContext retryCtx = SessionRetryContext.create(client).build();
76+
retryCtx.supplyStatus(session -> session.dropTable(tablePath)).join();
77+
78+
logger.info("Clean database OK");
79+
}
80+
81+
@Ignore
82+
@Test
83+
public void selectWithResourcePoolTest() {
84+
createResourcePool(TEST_RESOURCE_POOL);
85+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
86+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
87+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
88+
.withExecMode(QueryExecMode.EXECUTE)
89+
.withResourcePool(TEST_RESOURCE_POOL)
90+
.withStatsMode(QueryStatsMode.FULL)
91+
.build();
92+
93+
Assert.assertTrue("Query shouldn't fail",
94+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty(), settings).execute()
95+
.join().isSuccess());
96+
}
97+
} finally {
98+
deleteResourcePool(TEST_RESOURCE_POOL, true);
99+
}
100+
}
101+
102+
@Ignore
103+
@Test
104+
public void selectWithResourcePoolShouldBeCaseSensitiveTest() {
105+
createResourcePool(TEST_RESOURCE_POOL);
106+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
107+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
108+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
109+
.withExecMode(QueryExecMode.EXECUTE)
110+
.withResourcePool(TEST_RESOURCE_POOL.toUpperCase())
111+
.withStatsMode(QueryStatsMode.FULL)
112+
.build();
113+
114+
Assert.assertFalse("Query should fail",
115+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty(), settings).execute()
116+
.join().isSuccess());
117+
}
118+
} finally {
119+
deleteResourcePool(TEST_RESOURCE_POOL, true);
120+
}
121+
}
122+
123+
/**
124+
* Check that we don't cache resource pool in session
125+
*/
126+
@Ignore
127+
@Test
128+
public void selectWithResourcePoolShouldNotCachePoolInSessionTest() {
129+
createResourcePool(TEST_RESOURCE_POOL_WITH_DELETE);
130+
131+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
132+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
133+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
134+
.withExecMode(QueryExecMode.EXECUTE)
135+
.withResourcePool(TEST_RESOURCE_POOL_WITH_DELETE)
136+
.withStatsMode(QueryStatsMode.FULL)
137+
.build();
138+
139+
Assert.assertTrue("Query shouldn't fail",
140+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty(), settings).execute()
141+
.join().isSuccess());
142+
143+
deleteResourcePool(TEST_RESOURCE_POOL_WITH_DELETE, true);
144+
145+
Assert.assertTrue("Query shouldn't cache in session previous call to resource pool",
146+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty()).execute()
147+
.join().isSuccess());
148+
}
149+
} finally {
150+
deleteResourcePool(TEST_RESOURCE_POOL_WITH_DELETE, false);
151+
}
152+
}
153+
154+
155+
@Test
156+
public void selectWithDefaultResourcePoolTest() {
157+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
158+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
159+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
160+
.withExecMode(QueryExecMode.EXECUTE)
161+
.withResourcePool("default")
162+
.withStatsMode(QueryStatsMode.FULL)
163+
.build();
164+
165+
Assert.assertTrue("Query shouldn't fail with default pool name",
166+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty(), settings).execute()
167+
.join().isSuccess());
168+
}
169+
}
170+
}
171+
172+
@Test
173+
public void selectWithDefaultResourcePoolAndEmptyStringTest() {
174+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
175+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
176+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
177+
.withExecMode(QueryExecMode.EXECUTE)
178+
.withResourcePool("")
179+
.build();
180+
181+
Assert.assertTrue("Query shouldn't fail cause empty string equivalent to default pool.",
182+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty(), settings).execute()
183+
.join().isSuccess());
184+
}
185+
}
186+
}
187+
188+
@Ignore
189+
@Test
190+
public void selectShouldFailWithUnknownResourcePollTest() {
191+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
192+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
193+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
194+
.withExecMode(QueryExecMode.EXECUTE)
195+
.withResourcePool("some_unknown_pool")
196+
.withStatsMode(QueryStatsMode.FULL)
197+
.build();
198+
199+
Assert.assertFalse("Query should fail cause poll not exists",
200+
session.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id;", TxMode.SERIALIZABLE_RW, Params.empty(), settings).execute()
201+
.join().isSuccess());
202+
}
203+
}
204+
}
205+
206+
private static void createResourcePool(String resourcePoolName) {
207+
try (QueryClient queryClient = QueryClient.newClient(ydbTransport).build()) {
208+
try (QuerySession querySession = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
209+
Result<QueryInfo> result = querySession.createQuery("CREATE RESOURCE POOL " + resourcePoolName + " WITH (\n" +
210+
" CONCURRENT_QUERY_LIMIT=10,\n" +
211+
" QUEUE_SIZE=1000,\n" +
212+
" DATABASE_LOAD_CPU_THRESHOLD=80);", TxMode.NONE).execute().join();
213+
214+
Assert.assertTrue(result.getStatus().toString(), result.isSuccess());
215+
}
216+
}
217+
}
218+
219+
private static void deleteResourcePool(String resourcePoolName, boolean checkError) {
220+
try (QueryClient queryClient = QueryClient.newClient(ydbTransport).build()) {
221+
try (QuerySession querySession = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
222+
Result<QueryInfo> result = querySession.createQuery("DROP RESOURCE POOL " + resourcePoolName + ";", TxMode.NONE).execute().join();
223+
224+
if (checkError) {
225+
Assert.assertTrue(result.getStatus().toString(), result.isSuccess());
226+
}
227+
}
228+
}
229+
}
230+
}

0 commit comments

Comments
 (0)