Skip to content

Commit d054ca1

Browse files
committed
trying to improve stability of NettyFlowControlTest
1 parent cdab410 commit d054ca1

File tree

2 files changed

+47
-11
lines changed

2 files changed

+47
-11
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2025 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.internal;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
21+
/**
22+
* An extension of {@link java.util.concurrent.CountDownLatch CountDownLatch} to simplify usage when
23+
* there are only 2 states.
24+
*/
25+
public class BooleanLatch extends CountDownLatch {
26+
27+
public BooleanLatch() {
28+
super(1);
29+
}
30+
31+
public boolean isSignaled() {
32+
return getCount() == 0;
33+
}
34+
35+
public void signal() {
36+
countDown();
37+
}
38+
}

interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.ServerBuilder;
2525
import io.grpc.ServerInterceptor;
2626
import io.grpc.ServerInterceptors;
27+
import io.grpc.internal.BooleanLatch;
2728
import io.grpc.netty.GrpcHttp2ConnectionHandler;
2829
import io.grpc.netty.InternalNettyChannelBuilder;
2930
import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
@@ -41,7 +42,6 @@
4142
import io.netty.util.AsciiString;
4243
import java.io.IOException;
4344
import java.net.InetSocketAddress;
44-
import java.util.concurrent.CountDownLatch;
4545
import java.util.concurrent.Executors;
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicReference;
@@ -190,10 +190,9 @@ private void startServer(int serverFlowControlWindow) {
190190
private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> {
191191

192192
final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef;
193-
final CountDownLatch latch = new CountDownLatch(1);
193+
final BooleanLatch completed = new BooleanLatch();
194194
final long expectedWindow;
195-
int lastWindow;
196-
boolean wasCompleted;
195+
volatile int lastWindow;
197196

198197
public TestStreamObserver(
199198
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
@@ -208,31 +207,30 @@ public void onNext(StreamingOutputCallResponse value) {
208207
int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
209208
synchronized (this) {
210209
if (curWindow >= expectedWindow) {
211-
if (wasCompleted) {
210+
if (completed.isSignaled()) {
212211
return;
213212
}
214-
wasCompleted = true;
215213
lastWindow = curWindow;
216-
onCompleted();
217-
} else if (!wasCompleted) {
214+
completed.signal();
215+
} else if (!completed.isSignaled()) {
218216
lastWindow = curWindow;
219217
}
220218
}
221219
}
222220

223221
@Override
224222
public void onError(Throwable t) {
225-
latch.countDown();
223+
completed.signal();
226224
throw new RuntimeException(t);
227225
}
228226

229227
@Override
230228
public void onCompleted() {
231-
latch.countDown();
229+
completed.signal();
232230
}
233231

234232
public int waitFor(long duration, TimeUnit unit) throws InterruptedException {
235-
latch.await(duration, unit);
233+
assertTrue("should be completed", completed.await(duration, unit));
236234
return lastWindow;
237235
}
238236
}

0 commit comments

Comments
 (0)