Skip to content

Commit 64c9393

Browse files
committed
Added integration tests on session pool behaviour
1 parent 6fa3dfb commit 64c9393

File tree

4 files changed

+520
-0
lines changed

4 files changed

+520
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package tech.ydb.query.impl;
2+
3+
import java.util.Queue;
4+
import java.util.concurrent.ConcurrentLinkedQueue;
5+
import java.util.function.Consumer;
6+
7+
import javax.annotation.Nullable;
8+
9+
import io.grpc.Attributes;
10+
import io.grpc.CallOptions;
11+
import io.grpc.Channel;
12+
import io.grpc.ClientCall;
13+
import io.grpc.ClientInterceptor;
14+
import io.grpc.ManagedChannelBuilder;
15+
import io.grpc.Metadata;
16+
import io.grpc.MethodDescriptor;
17+
import io.grpc.Status;
18+
19+
/**
20+
*
21+
* @author Aleksandr Gorshenin
22+
*/
23+
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
24+
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
25+
26+
public void reset() {
27+
nextStatus.clear();
28+
}
29+
30+
public void addNextStatus(Status status) {
31+
nextStatus.add(status);
32+
}
33+
34+
@Override
35+
public void accept(ManagedChannelBuilder<?> t) {
36+
t.intercept(this);
37+
}
38+
39+
@Override
40+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
41+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
42+
return new ProxyClientCall<>(next, method, callOptions);
43+
}
44+
45+
private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
46+
private final ClientCall<ReqT, RespT> delegate;
47+
48+
private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
49+
CallOptions callOptions) {
50+
this.delegate = channel.newCall(method, callOptions);
51+
}
52+
53+
@Override
54+
public void request(int numMessages) {
55+
delegate.request(numMessages);
56+
}
57+
58+
@Override
59+
public void cancel(@Nullable String message, @Nullable Throwable cause) {
60+
delegate.cancel(message, cause);
61+
}
62+
63+
@Override
64+
public void halfClose() {
65+
delegate.halfClose();
66+
}
67+
68+
@Override
69+
public void setMessageCompression(boolean enabled) {
70+
delegate.setMessageCompression(enabled);
71+
}
72+
73+
@Override
74+
public boolean isReady() {
75+
return delegate.isReady();
76+
}
77+
78+
@Override
79+
public Attributes getAttributes() {
80+
return delegate.getAttributes();
81+
}
82+
83+
@Override
84+
public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
85+
delegate.start(new ProxyListener(listener), headers);
86+
}
87+
88+
@Override
89+
public void sendMessage(ReqT message) {
90+
delegate.sendMessage(message);
91+
}
92+
93+
private class ProxyListener extends ClientCall.Listener<RespT> {
94+
private final ClientCall.Listener<RespT> delegate;
95+
96+
public ProxyListener(ClientCall.Listener<RespT> delegate) {
97+
this.delegate = delegate;
98+
}
99+
100+
101+
@Override
102+
public void onHeaders(Metadata headers) {
103+
delegate.onHeaders(headers);
104+
}
105+
106+
@Override
107+
public void onMessage(RespT message) {
108+
delegate.onMessage(message);
109+
}
110+
111+
@Override
112+
public void onClose(Status status, Metadata trailers) {
113+
Status next = nextStatus.poll();
114+
delegate.onClose(next != null ? next : status, trailers);
115+
}
116+
117+
@Override
118+
public void onReady() {
119+
delegate.onReady();
120+
}
121+
}
122+
}
123+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package tech.ydb.query.impl;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.After;
6+
import org.junit.AfterClass;
7+
import org.junit.Assert;
8+
import org.junit.Before;
9+
import org.junit.BeforeClass;
10+
import org.junit.ClassRule;
11+
import org.junit.Test;
12+
13+
import tech.ydb.auth.TokenAuthProvider;
14+
import tech.ydb.common.transaction.TxMode;
15+
import tech.ydb.core.Result;
16+
import tech.ydb.core.StatusCode;
17+
import tech.ydb.core.grpc.GrpcTransport;
18+
import tech.ydb.query.QueryClient;
19+
import tech.ydb.query.QuerySession;
20+
import tech.ydb.query.result.QueryInfo;
21+
import tech.ydb.test.junit4.YdbHelperRule;
22+
23+
/**
24+
*
25+
* @author Aleksandr Gorshenin
26+
*/
27+
public class QueryClientTest {
28+
29+
@ClassRule
30+
public static final YdbHelperRule YDB = new YdbHelperRule();
31+
32+
private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor();
33+
34+
private static GrpcTransport transport;
35+
36+
private static QueryClient queryClient;
37+
38+
@BeforeClass
39+
public static void initTransport() {
40+
transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database())
41+
.withAuthProvider(new TokenAuthProvider(YDB.authToken()))
42+
.addChannelInitializer(grpcInterceptor)
43+
.build();
44+
}
45+
46+
@AfterClass
47+
public static void closeTransport() {
48+
transport.close();
49+
}
50+
51+
@Before
52+
public void initTableClient() {
53+
grpcInterceptor.reset();
54+
queryClient = QueryClient.newClient(transport).build();
55+
}
56+
57+
@After
58+
public void closeTableClient() {
59+
queryClient.close();
60+
}
61+
62+
private QuerySession getSession() {
63+
return queryClient.createSession(Duration.ofSeconds(5)).join().getValue();
64+
}
65+
66+
@Test
67+
public void sessionReuseTest() {
68+
QuerySession s1 = getSession();
69+
String id1 = s1.getId();
70+
s1.close();
71+
72+
QuerySession s2 = getSession();
73+
Assert.assertEquals(id1, s2.getId());
74+
75+
QuerySession s3 = getSession();
76+
Assert.assertNotEquals(id1, s3.getId());
77+
String id2 = s3.getId();
78+
79+
s2.close();
80+
s3.close();
81+
82+
QuerySession s4 = getSession();
83+
QuerySession s5 = getSession();
84+
85+
Assert.assertEquals(id2, s4.getId());
86+
Assert.assertEquals(id1, s5.getId());
87+
88+
s4.close();
89+
s5.close();
90+
}
91+
92+
@Test
93+
public void sessionExecuteQueryTest() {
94+
QuerySession s1 = getSession();
95+
String id1 = s1.getId();
96+
97+
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
98+
99+
Result<QueryInfo> res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
100+
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());
101+
102+
res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
103+
Assert.assertEquals(StatusCode.SUCCESS, res.getStatus().getCode());
104+
105+
s1.close();
106+
107+
QuerySession s2 = getSession();
108+
Assert.assertNotEquals(id1, s2.getId());
109+
String id2 = s2.getId();
110+
111+
res = s2.createQuery("SELECT * FROM wrongTable", TxMode.NONE).execute().join();
112+
Assert.assertEquals(StatusCode.SCHEME_ERROR, res.getStatus().getCode());
113+
114+
s2.close();
115+
116+
try (QuerySession s3 = getSession()) {
117+
Assert.assertEquals(id2, s3.getId());
118+
}
119+
}
120+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package tech.ydb.table.integration;
2+
3+
import java.util.Queue;
4+
import java.util.concurrent.ConcurrentLinkedQueue;
5+
import java.util.function.Consumer;
6+
7+
import javax.annotation.Nullable;
8+
9+
import io.grpc.Attributes;
10+
import io.grpc.CallOptions;
11+
import io.grpc.Channel;
12+
import io.grpc.ClientCall;
13+
import io.grpc.ClientInterceptor;
14+
import io.grpc.ManagedChannelBuilder;
15+
import io.grpc.Metadata;
16+
import io.grpc.MethodDescriptor;
17+
import io.grpc.Status;
18+
19+
/**
20+
*
21+
* @author Aleksandr Gorshenin
22+
*/
23+
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
24+
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
25+
26+
public void reset() {
27+
nextStatus.clear();
28+
}
29+
30+
public void addNextStatus(Status status) {
31+
nextStatus.add(status);
32+
}
33+
34+
@Override
35+
public void accept(ManagedChannelBuilder<?> t) {
36+
t.intercept(this);
37+
}
38+
39+
@Override
40+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
41+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
42+
return new ProxyClientCall<>(next, method, callOptions);
43+
}
44+
45+
private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
46+
private final ClientCall<ReqT, RespT> delegate;
47+
48+
private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
49+
CallOptions callOptions) {
50+
this.delegate = channel.newCall(method, callOptions);
51+
}
52+
53+
@Override
54+
public void request(int numMessages) {
55+
delegate.request(numMessages);
56+
}
57+
58+
@Override
59+
public void cancel(@Nullable String message, @Nullable Throwable cause) {
60+
delegate.cancel(message, cause);
61+
}
62+
63+
@Override
64+
public void halfClose() {
65+
delegate.halfClose();
66+
}
67+
68+
@Override
69+
public void setMessageCompression(boolean enabled) {
70+
delegate.setMessageCompression(enabled);
71+
}
72+
73+
@Override
74+
public boolean isReady() {
75+
return delegate.isReady();
76+
}
77+
78+
@Override
79+
public Attributes getAttributes() {
80+
return delegate.getAttributes();
81+
}
82+
83+
@Override
84+
public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
85+
delegate.start(new ProxyListener(listener), headers);
86+
}
87+
88+
@Override
89+
public void sendMessage(ReqT message) {
90+
delegate.sendMessage(message);
91+
}
92+
93+
private class ProxyListener extends ClientCall.Listener<RespT> {
94+
private final ClientCall.Listener<RespT> delegate;
95+
96+
public ProxyListener(ClientCall.Listener<RespT> delegate) {
97+
this.delegate = delegate;
98+
}
99+
100+
101+
@Override
102+
public void onHeaders(Metadata headers) {
103+
delegate.onHeaders(headers);
104+
}
105+
106+
@Override
107+
public void onMessage(RespT message) {
108+
delegate.onMessage(message);
109+
}
110+
111+
@Override
112+
public void onClose(Status status, Metadata trailers) {
113+
Status next = nextStatus.poll();
114+
delegate.onClose(next != null ? next : status, trailers);
115+
}
116+
117+
@Override
118+
public void onReady() {
119+
delegate.onReady();
120+
}
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)