Skip to content

Commit 46a1840

Browse files
committed
Add shutdown tests with real calls
1 parent 46ecee3 commit 46a1840

File tree

3 files changed

+471
-0
lines changed

3 files changed

+471
-0
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2016-2021 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.test.server;
19+
20+
import static java.time.Duration.ofMillis;
21+
import static java.time.Duration.ofSeconds;
22+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
23+
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
24+
25+
import java.time.Duration;
26+
import java.util.concurrent.ArrayBlockingQueue;
27+
import java.util.concurrent.BlockingQueue;
28+
29+
import com.google.protobuf.Empty;
30+
31+
import io.grpc.stub.StreamObserver;
32+
import net.devh.boot.grpc.test.proto.SomeType;
33+
import net.devh.boot.grpc.test.proto.TestServiceGrpc.TestServiceImplBase;
34+
35+
/**
36+
* A test service implementation that spends a configurable amount of time processing a request.
37+
*/
38+
public class WaitingTestService extends TestServiceImplBase {
39+
40+
// Queue length = 1 -> Ability to control/await server calls
41+
private final BlockingQueue<Long> delays = new ArrayBlockingQueue<>(1);
42+
43+
/**
44+
* The next call will wait the configured amount of time before completing. Allows exactly one call to process. May
45+
* only queue up to one call.
46+
*
47+
* @param delay The delay to wait for.
48+
*/
49+
public synchronized void nextDelay(final Duration delay) {
50+
assertTimeoutPreemptively(
51+
ofSeconds(1),
52+
() -> assertDoesNotThrow(() -> this.delays.put(delay.toMillis())),
53+
"Failed to queue delay");
54+
}
55+
56+
/**
57+
* Waits until all request have started processing on the server.
58+
*/
59+
public synchronized void awaitAllRequestsArrived() {
60+
// Just try to set a value
61+
nextDelay(ofMillis(-1));
62+
this.delays.clear();
63+
}
64+
65+
@Override
66+
public void normal(final Empty request, final StreamObserver<SomeType> responseObserver) {
67+
// Simulate processing time
68+
assertDoesNotThrow(this::sleep);
69+
responseObserver.onNext(SomeType.getDefaultInstance());
70+
responseObserver.onCompleted();
71+
}
72+
73+
private void sleep() throws InterruptedException {
74+
final long delay = assertTimeoutPreemptively(ofSeconds(1), () -> this.delays.take());
75+
if (delay <= 0) {
76+
throw new IllegalStateException("Bad delay: " + delay);
77+
}
78+
Thread.sleep(delay);
79+
}
80+
81+
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright (c) 2016-2021 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.test.shutdown;
19+
20+
import static io.grpc.Status.Code.UNAVAILABLE;
21+
import static java.time.Duration.ZERO;
22+
import static java.time.Duration.ofMillis;
23+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
24+
import static net.devh.boot.grpc.test.util.FutureAssertions.assertFutureEquals;
25+
import static net.devh.boot.grpc.test.util.GrpcAssertions.assertFutureThrowsStatus;
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
28+
import static org.junit.jupiter.api.Assertions.assertTimeout;
29+
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
30+
31+
import java.io.IOException;
32+
import java.time.Duration;
33+
import java.util.concurrent.Future;
34+
import java.util.concurrent.atomic.AtomicReference;
35+
36+
import org.junit.jupiter.api.AfterAll;
37+
import org.junit.jupiter.api.BeforeAll;
38+
import org.junit.jupiter.api.Test;
39+
import org.junit.jupiter.api.function.Executable;
40+
import org.junit.jupiter.api.function.ThrowingConsumer;
41+
import org.springframework.context.support.GenericApplicationContext;
42+
43+
import com.google.protobuf.Empty;
44+
45+
import io.grpc.Channel;
46+
import io.grpc.Server;
47+
import io.grpc.Status;
48+
import io.grpc.inprocess.InProcessServerBuilder;
49+
import net.devh.boot.grpc.client.channelfactory.GrpcChannelFactory;
50+
import net.devh.boot.grpc.client.channelfactory.InProcessChannelFactory;
51+
import net.devh.boot.grpc.client.config.GrpcChannelsProperties;
52+
import net.devh.boot.grpc.client.interceptor.GlobalClientInterceptorRegistry;
53+
import net.devh.boot.grpc.test.proto.SomeType;
54+
import net.devh.boot.grpc.test.proto.TestServiceGrpc;
55+
import net.devh.boot.grpc.test.proto.TestServiceGrpc.TestServiceFutureStub;
56+
import net.devh.boot.grpc.test.server.WaitingTestService;
57+
58+
/**
59+
* Tests for {@link InProcessChannelFactory}'s shutdown behavior using an in-process server/channel.
60+
*/
61+
class GrpcChannelLifecycleWithCallsTest {
62+
63+
private static final int A_BIT_TIME = 100;
64+
private static final Empty EMPTY = Empty.getDefaultInstance();
65+
static final SomeType SOME_TYPE = SomeType.getDefaultInstance();
66+
67+
private static final WaitingTestService service = new WaitingTestService();
68+
private static final Server server = InProcessServerBuilder.forName("test")
69+
.addService(service)
70+
.build();
71+
private static final GenericApplicationContext applicationContext = new GenericApplicationContext();
72+
73+
@BeforeAll
74+
static void beforeAll() throws IOException {
75+
applicationContext.refresh();
76+
// Init classes before the actual test, because that is somewhat slow
77+
applicationContext.start();
78+
server.start();
79+
withStub(ZERO, stub -> {
80+
81+
service.nextDelay(ofMillis(1));
82+
stub.normal(EMPTY);
83+
84+
service.awaitAllRequestsArrived();
85+
86+
});
87+
}
88+
89+
@AfterAll
90+
static void afterAll() {
91+
server.shutdownNow();
92+
applicationContext.close();
93+
}
94+
95+
@Test
96+
void testZeroShutdownGracePeriod() {
97+
98+
final AtomicReference<Future<SomeType>> request = new AtomicReference<>();
99+
100+
assertTimeoutPreemptively(ofMillis(A_BIT_TIME),
101+
runWithStub(ZERO, stub -> {
102+
103+
// Send request (Takes 5s)
104+
service.nextDelay(ofMillis(5000));
105+
request.set(stub.normal(EMPTY));
106+
107+
service.awaitAllRequestsArrived();
108+
109+
}));
110+
111+
// The request did not complete in time
112+
assertFailedShutdown(request);
113+
}
114+
115+
@Test
116+
void testShortShutdownGracePeriod() {
117+
118+
final AtomicReference<Future<SomeType>> request1 = new AtomicReference<>();
119+
final AtomicReference<Future<SomeType>> request2 = new AtomicReference<>();
120+
final AtomicReference<Future<SomeType>> request3 = new AtomicReference<>();
121+
122+
assertTimeout(ofMillis(2000 + A_BIT_TIME),
123+
runWithStub(ofMillis(2000), stub -> {
124+
125+
// Send first request (Takes 1s)
126+
service.nextDelay(ofMillis(1000));
127+
request1.set(stub.normal(EMPTY));
128+
129+
// Send second request (Takes 5s)
130+
service.nextDelay(ofMillis(5000));
131+
request2.set(stub.normal(EMPTY));
132+
133+
// Send last request (Takes 1s)
134+
service.nextDelay(ofMillis(1000));
135+
request3.set(stub.normal(EMPTY));
136+
137+
service.awaitAllRequestsArrived();
138+
139+
}));
140+
141+
// First one completed
142+
assertCompleted(request1);
143+
144+
// The second one did not complete in time
145+
assertFailedShutdown(request2);
146+
147+
// Last one completed
148+
assertCompleted(request3);
149+
}
150+
151+
@Test
152+
void testInfiniteShutdownGracePeriod() {
153+
154+
final AtomicReference<Future<SomeType>> request = new AtomicReference<>();
155+
156+
assertTimeoutPreemptively(ofMillis(1000 + A_BIT_TIME),
157+
runWithStub(ofMillis(-1), stub -> {
158+
159+
// Send request (Takes 1s)
160+
service.nextDelay(ofMillis(1000));
161+
request.set(stub.normal(EMPTY));
162+
163+
service.awaitAllRequestsArrived();
164+
165+
}));
166+
167+
// Request completed
168+
assertCompleted(request);
169+
}
170+
171+
static Executable runWithStub(final Duration shutdownGracePeriod,
172+
final ThrowingConsumer<TestServiceFutureStub> executuable) {
173+
return () -> withStub(shutdownGracePeriod, executuable);
174+
}
175+
176+
static void withStub(final Duration shutdownGracePeriod,
177+
final ThrowingConsumer<TestServiceFutureStub> executuable) {
178+
179+
final GrpcChannelsProperties properties = new GrpcChannelsProperties();
180+
properties.getGlobalChannel().setShutdownGracePeriod(shutdownGracePeriod);
181+
try (final GrpcChannelFactory factory = new InProcessChannelFactory(properties,
182+
new GlobalClientInterceptorRegistry(applicationContext))) {
183+
184+
final Channel channel = factory.createChannel("test");
185+
final TestServiceFutureStub stub = TestServiceGrpc.newFutureStub(channel);
186+
assertDoesNotThrow(() -> executuable.accept(stub));
187+
}
188+
}
189+
190+
private <T> void assertFailedShutdown(final AtomicReference<Future<T>> request) {
191+
final Status status = assertFutureThrowsStatus(UNAVAILABLE, request.get(), A_BIT_TIME, MILLISECONDS);
192+
assertThat(status.getDescription()).contains("shutdownNow");
193+
}
194+
195+
private void assertCompleted(final AtomicReference<Future<SomeType>> request) {
196+
assertFutureEquals(SOME_TYPE, request.get(), A_BIT_TIME, MILLISECONDS);
197+
}
198+
199+
}

0 commit comments

Comments
 (0)