Skip to content

Commit fe1b8ee

Browse files
committed
Fixed test flapping
1 parent acf9f06 commit fe1b8ee

File tree

4 files changed

+23
-21
lines changed

4 files changed

+23
-21
lines changed

query/src/test/java/tech/ydb/query/impl/GrpcTestInterceptor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
* @author Aleksandr Gorshenin
2222
*/
2323
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
24-
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
24+
private volatile Queue<Status> overrideQueue = new ConcurrentLinkedQueue<>();
2525

2626
public void reset() {
27-
nextStatus.clear();
27+
overrideQueue = new ConcurrentLinkedQueue<>();
2828
}
2929

30-
public void addNextStatus(Status status) {
31-
nextStatus.add(status);
30+
public void addOverrideStatus(Status status) {
31+
overrideQueue.add(status);
3232
}
3333

3434
@Override
@@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder<?> t) {
3939
@Override
4040
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
4141
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
42-
return new ProxyClientCall<>(next, method, callOptions);
42+
return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions);
4343
}
4444

45-
private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
45+
private static class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
4646
private final ClientCall<ReqT, RespT> delegate;
47+
private final Status overrided;
4748

48-
private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
49+
private ProxyClientCall(Channel channel, Status overrided, MethodDescriptor<ReqT, RespT> method,
4950
CallOptions callOptions) {
5051
this.delegate = channel.newCall(method, callOptions);
52+
this.overrided = overrided;
5153
}
5254

5355
@Override
@@ -110,8 +112,7 @@ public void onMessage(RespT message) {
110112

111113
@Override
112114
public void onClose(Status status, Metadata trailers) {
113-
Status next = nextStatus.poll();
114-
delegate.onClose(next != null ? next : status, trailers);
115+
delegate.onClose(overrided != null ? overrided : status, trailers);
115116
}
116117

117118
@Override

query/src/test/java/tech/ydb/query/impl/QueryClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void sessionExecuteQueryTest() {
9494
QuerySession s1 = getSession();
9595
String id1 = s1.getId();
9696

97-
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
97+
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);
9898

9999
Result<QueryInfo> res = s1.createQuery("SELECT 1 + 2", TxMode.NONE).execute().join();
100100
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());

table/src/test/java/tech/ydb/table/integration/GrpcTestInterceptor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
* @author Aleksandr Gorshenin
2222
*/
2323
public class GrpcTestInterceptor implements Consumer<ManagedChannelBuilder<?>>, ClientInterceptor {
24-
private final Queue<Status> nextStatus = new ConcurrentLinkedQueue<>();
24+
private volatile Queue<Status> overrideQueue = new ConcurrentLinkedQueue<>();
2525

2626
public void reset() {
27-
nextStatus.clear();
27+
overrideQueue = new ConcurrentLinkedQueue<>();
2828
}
2929

30-
public void addNextStatus(Status status) {
31-
nextStatus.add(status);
30+
public void addOverrideStatus(Status status) {
31+
overrideQueue.add(status);
3232
}
3333

3434
@Override
@@ -39,15 +39,17 @@ public void accept(ManagedChannelBuilder<?> t) {
3939
@Override
4040
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
4141
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
42-
return new ProxyClientCall<>(next, method, callOptions);
42+
return new ProxyClientCall<>(next, overrideQueue.poll(), method, callOptions);
4343
}
4444

45-
private class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
45+
private static class ProxyClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
4646
private final ClientCall<ReqT, RespT> delegate;
47+
private final Status overrided;
4748

48-
private ProxyClientCall(Channel channel, MethodDescriptor<ReqT, RespT> method,
49+
private ProxyClientCall(Channel channel, Status overrided, MethodDescriptor<ReqT, RespT> method,
4950
CallOptions callOptions) {
5051
this.delegate = channel.newCall(method, callOptions);
52+
this.overrided = overrided;
5153
}
5254

5355
@Override
@@ -110,8 +112,7 @@ public void onMessage(RespT message) {
110112

111113
@Override
112114
public void onClose(Status status, Metadata trailers) {
113-
Status next = nextStatus.poll();
114-
delegate.onClose(next != null ? next : status, trailers);
115+
delegate.onClose(overrided != null ? overrided : status, trailers);
115116
}
116117

117118
@Override

table/src/test/java/tech/ydb/table/integration/TableClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void sessionExecuteDataQueryTest() {
9797
Session s1 = getSession();
9898
String id1 = s1.getId();
9999

100-
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
100+
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);
101101

102102
Result<DataQueryResult> res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join();
103103
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getStatus().getCode());
@@ -128,7 +128,7 @@ public void sessionExecuteScanQueryTest() {
128128
Session s1 = getSession();
129129
String id1 = s1.getId();
130130

131-
grpcInterceptor.addNextStatus(io.grpc.Status.UNAVAILABLE);
131+
grpcInterceptor.addOverrideStatus(io.grpc.Status.UNAVAILABLE);
132132

133133
Status res = s1.executeScanQuery("SELECT 1 + 2", Params.empty(), settings).start(rsr -> {}).join();
134134
Assert.assertEquals(StatusCode.TRANSPORT_UNAVAILABLE, res.getCode());

0 commit comments

Comments
 (0)