Skip to content

Commit 6f773e9

Browse files
fix: wait for termination on shutdown (#19)
* fix: wait for termination ono shutdoown * fix: use error log level for force shutdown * refactor: share deadline for shutdown, expose for caller * docs: add shutdown docs
1 parent c372c51 commit 6f773e9

File tree

7 files changed

+168
-25
lines changed

7 files changed

+168
-25
lines changed

grpc-client-rx-utils/build.gradle.kts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ plugins {
77

88
dependencies {
99
api("io.reactivex.rxjava3:rxjava:3.0.6")
10-
api("io.grpc:grpc-stub:1.36.0")
10+
api("io.grpc:grpc-stub:1.40.0")
1111
api(project(":grpc-context-utils"))
12-
implementation("io.grpc:grpc-context:1.36.0")
12+
implementation("io.grpc:grpc-context:1.40.0")
1313

1414
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
15-
testImplementation("org.mockito:mockito-core:3.5.11")
16-
testImplementation("org.mockito:mockito-junit-jupiter:3.5.11")
15+
testImplementation("org.mockito:mockito-core:3.12.1")
16+
testImplementation("org.mockito:mockito-junit-jupiter:3.12.1")
1717
}
1818

1919
tasks.test {

grpc-client-utils/build.gradle.kts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@ dependencies {
1313
// End Logging
1414

1515
// grpc
16-
implementation("io.grpc:grpc-core:1.36.0")
16+
implementation("io.grpc:grpc-core:1.40.0")
1717
constraints {
1818
implementation("com.google.guava:guava:30.0-jre") {
1919
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
2020
}
2121
}
2222

2323
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
24-
testImplementation("org.mockito:mockito-core:3.4.4")
25-
testRuntimeOnly("io.grpc:grpc-netty:1.36.0")
24+
testImplementation("org.mockito:mockito-core:3.12.1")
25+
testImplementation("org.mockito:mockito-inline:3.12.1")
26+
testRuntimeOnly("io.grpc:grpc-netty:1.40.0")
2627
}
2728

2829
tasks.test {

grpc-client-utils/src/main/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistry.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,33 @@
11
package org.hypertrace.core.grpcutils.client;
22

3+
import static java.time.temporal.ChronoUnit.MINUTES;
4+
import static java.time.temporal.ChronoUnit.SECONDS;
5+
36
import io.grpc.ManagedChannel;
47
import io.grpc.ManagedChannelBuilder;
8+
import java.time.Clock;
9+
import java.time.Instant;
510
import java.util.Map;
611
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.TimeUnit;
713
import org.slf4j.Logger;
814
import org.slf4j.LoggerFactory;
915

1016
public class GrpcChannelRegistry {
1117
private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class);
1218
private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
19+
private final Clock clock;
1320
private volatile boolean isShutdown = false;
1421

22+
@Deprecated
23+
public GrpcChannelRegistry() {
24+
this(Clock.systemUTC());
25+
}
26+
27+
public GrpcChannelRegistry(Clock clock) {
28+
this.clock = clock;
29+
}
30+
1531
/**
1632
* Use either {@link #forSecureAddress(String, int)} or {@link #forPlaintextAddress(String, int)}
1733
*/
@@ -49,8 +65,83 @@ private String getChannelId(String host, int port, boolean isPlaintext) {
4965
return securePrefix + ":" + host + ":" + port;
5066
}
5167

68+
/**
69+
* Shuts down channels using a default deadline of 1 minute.
70+
*
71+
* @see #shutdown(Instant)
72+
*/
5273
public void shutdown() {
53-
channelMap.values().forEach(ManagedChannel::shutdown);
74+
this.shutdown(this.clock.instant().plus(1, MINUTES));
75+
}
76+
77+
/**
78+
* Attempts to perform an orderly shutdown of all registered channels before the provided
79+
* deadline, else falling back to a forceful shutdown. The call waits for all shutdowns to
80+
* complete. More specifically, we go through three shutdown phases.
81+
*
82+
* <ol>
83+
* <li>First, we request an orderly shutdown across all registered channels. At this point, no
84+
* new calls will be accepted, but in-flight calls will be given a chance to complete before
85+
* shutting down.
86+
* <li>Next, we sequentially wait for each channel to complete. Although sequential, each
87+
* channel will wait no longer than the provided deadline.
88+
* <li>For any channels that have not shutdown successfully after the previous phase, we will
89+
* forcefully terminate it, cancelling any pending calls. Each channel is given up to 5
90+
* seconds for forceful termination, but should complete close to instantly.
91+
* </ol>
92+
*
93+
* Upon completion, the registry is moved to a shutdown state and the channel references are
94+
* cleared. Attempting to reference any channels from the registry at this point will result in an
95+
* error.
96+
*
97+
* @param deadline Deadline for all channels to complete graceful shutdown.
98+
*/
99+
public void shutdown(Instant deadline) {
100+
channelMap.forEach(this::initiateChannelShutdown);
101+
channelMap.keySet().stream()
102+
.filter(channelId -> !this.waitForGracefulShutdown(channelId, deadline))
103+
.forEach(this::forceShutdown);
104+
54105
this.isShutdown = true;
106+
this.channelMap.clear();
107+
}
108+
109+
private void initiateChannelShutdown(String channelId, ManagedChannel managedChannel) {
110+
LOG.info("Starting shutdown for channel [{}]", channelId);
111+
managedChannel.shutdown();
112+
}
113+
114+
private boolean waitForGracefulShutdown(String channelId, Instant deadline) {
115+
boolean successfullyShutdown = this.waitForTermination(channelId, deadline);
116+
if (successfullyShutdown) {
117+
LOG.info("Shutdown channel successfully [{}]", channelId);
118+
}
119+
return successfullyShutdown;
120+
}
121+
122+
private void forceShutdown(String channelId) {
123+
LOG.error("Shutting down channel [{}] forcefully", channelId);
124+
this.channelMap.get(channelId).shutdownNow();
125+
Instant forceShutdownDeadline = this.clock.instant().plus(5, SECONDS);
126+
if (this.waitForTermination(channelId, forceShutdownDeadline)) {
127+
LOG.error("Forced channel [{}] shutdown successful", channelId);
128+
} else {
129+
LOG.error("Unable to force channel [{}] shutdown in 5s - giving up!", channelId);
130+
}
131+
}
132+
133+
private boolean waitForTermination(String channelId, Instant deadline) {
134+
ManagedChannel managedChannel = this.channelMap.get(channelId);
135+
long millisRemaining = Math.max(0, deadline.toEpochMilli() - this.clock.millis());
136+
try {
137+
if (!managedChannel.awaitTermination(millisRemaining, TimeUnit.MILLISECONDS)) {
138+
LOG.error("Channel [{}] did not shut down after waiting", channelId);
139+
}
140+
} catch (InterruptedException ex) {
141+
LOG.error(
142+
"There has been an interruption while waiting for channel [{}] to shutdown", channelId);
143+
Thread.currentThread().interrupt();
144+
}
145+
return managedChannel.isTerminated();
55146
}
56147
}

grpc-client-utils/src/test/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistryTest.java

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,36 @@
11
package org.hypertrace.core.grpcutils.client;
22

3-
import static org.junit.jupiter.api.Assertions.assertFalse;
43
import static org.junit.jupiter.api.Assertions.assertNotNull;
54
import static org.junit.jupiter.api.Assertions.assertNotSame;
65
import static org.junit.jupiter.api.Assertions.assertSame;
76
import static org.junit.jupiter.api.Assertions.assertThrows;
8-
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
import static org.mockito.ArgumentMatchers.anyInt;
8+
import static org.mockito.ArgumentMatchers.anyString;
9+
import static org.mockito.Mockito.inOrder;
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.verifyNoInteractions;
12+
import static org.mockito.Mockito.when;
913

1014
import io.grpc.Channel;
1115
import io.grpc.ManagedChannel;
16+
import io.grpc.ManagedChannelBuilder;
17+
import java.time.Clock;
18+
import java.time.Instant;
19+
import java.time.ZoneOffset;
20+
import java.util.concurrent.TimeUnit;
1221
import org.junit.jupiter.api.BeforeEach;
1322
import org.junit.jupiter.api.Test;
23+
import org.mockito.InOrder;
24+
import org.mockito.MockedStatic;
25+
import org.mockito.Mockito;
1426

1527
class GrpcChannelRegistryTest {
1628

1729
GrpcChannelRegistry channelRegistry;
1830

1931
@BeforeEach
2032
void beforeEach() {
21-
this.channelRegistry = new GrpcChannelRegistry();
33+
this.channelRegistry = new GrpcChannelRegistry(Clock.systemUTC());
2234
}
2335

2436
@Test
@@ -40,14 +52,53 @@ void reusesChannelsForDuplicateRequests() {
4052
}
4153

4254
@Test
43-
void shutdownAllChannelsOnShutdown() {
44-
ManagedChannel firstChannel = this.channelRegistry.forPlaintextAddress("foo", 1000);
45-
ManagedChannel secondChannel = this.channelRegistry.forSecureAddress("foo", 1002);
46-
assertFalse(firstChannel.isShutdown());
47-
assertFalse(secondChannel.isShutdown());
48-
this.channelRegistry.shutdown();
49-
assertTrue(firstChannel.isShutdown());
50-
assertTrue(secondChannel.isShutdown());
55+
void shutdownAllChannelsOnShutdown() throws InterruptedException {
56+
this.channelRegistry =
57+
new GrpcChannelRegistry(Clock.fixed(Instant.ofEpochMilli(0), ZoneOffset.UTC));
58+
try (MockedStatic<ManagedChannelBuilder> mockedBuilderStatic =
59+
Mockito.mockStatic(ManagedChannelBuilder.class)) {
60+
61+
mockedBuilderStatic
62+
.when(() -> ManagedChannelBuilder.forAddress(anyString(), anyInt()))
63+
.thenAnswer(
64+
invocation -> {
65+
ManagedChannelBuilder<?> mockBuilder = mock(ManagedChannelBuilder.class);
66+
when(mockBuilder.build()).thenReturn(mock(ManagedChannel.class));
67+
return mockBuilder;
68+
});
69+
70+
ManagedChannel firstChannel = this.channelRegistry.forPlaintextAddress("foo", 1000);
71+
ManagedChannel secondChannel = this.channelRegistry.forSecureAddress("foo", 1002);
72+
73+
verifyNoInteractions(firstChannel);
74+
verifyNoInteractions(secondChannel);
75+
76+
// First channel shuts down successfully
77+
when(firstChannel.isTerminated()).thenReturn(true);
78+
// Second does not
79+
when(secondChannel.isTerminated()).thenReturn(false);
80+
81+
// Wait for 10ms (test clock fixed at 0)
82+
this.channelRegistry.shutdown(Instant.ofEpochMilli(10));
83+
84+
// First channel requests shutdown, waits, succeeds and checks result
85+
InOrder firstChannelVerifier = inOrder(firstChannel);
86+
firstChannelVerifier.verify(firstChannel).shutdown();
87+
firstChannelVerifier.verify(firstChannel).awaitTermination(10, TimeUnit.MILLISECONDS);
88+
firstChannelVerifier.verify(firstChannel).isTerminated();
89+
firstChannelVerifier.verifyNoMoreInteractions();
90+
91+
// Second channel requests shutdown, waits, fails, checks result, forces shutdown, waits,
92+
// fails and checks result again
93+
InOrder secondChannelVerifier = inOrder(secondChannel);
94+
secondChannelVerifier.verify(secondChannel).shutdown();
95+
secondChannelVerifier.verify(secondChannel).awaitTermination(10, TimeUnit.MILLISECONDS);
96+
secondChannelVerifier.verify(secondChannel).isTerminated();
97+
secondChannelVerifier.verify(secondChannel).shutdownNow();
98+
// hardcoded 5s for force shutdown
99+
secondChannelVerifier.verify(secondChannel).awaitTermination(5000, TimeUnit.MILLISECONDS);
100+
secondChannelVerifier.verify(secondChannel).isTerminated();
101+
}
51102
}
52103

53104
@Test

grpc-context-utils/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tasks.test {
1111

1212
dependencies {
1313
// grpc
14-
implementation("io.grpc:grpc-core:1.36.0")
14+
implementation("io.grpc:grpc-core:1.40.0")
1515

1616
implementation("com.auth0:java-jwt:3.14.0")
1717
implementation("com.auth0:jwks-rsa:0.17.0")
@@ -22,5 +22,5 @@ dependencies {
2222
// End Logging
2323

2424
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
25-
testImplementation("org.mockito:mockito-core:3.8.0")
25+
testImplementation("org.mockito:mockito-core:3.12.1")
2626
}

grpc-server-rx-utils/build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77

88
dependencies {
99
api("io.reactivex.rxjava3:rxjava:3.0.6")
10-
api("io.grpc:grpc-stub:1.36.0")
10+
api("io.grpc:grpc-stub:1.40.0")
1111

1212
annotationProcessor("org.projectlombok:lombok:1.18.18")
1313
compileOnly("org.projectlombok:lombok:1.18.18")
@@ -20,8 +20,8 @@ dependencies {
2020
}
2121
}
2222
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
23-
testImplementation("org.mockito:mockito-core:3.5.11")
24-
testImplementation("org.mockito:mockito-junit-jupiter:3.5.11")
23+
testImplementation("org.mockito:mockito-core:3.12.1")
24+
testImplementation("org.mockito:mockito-junit-jupiter:3.12.1")
2525
}
2626

2727
tasks.test {

grpc-server-utils/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies {
1717
// End Logging
1818

1919
// grpc
20-
implementation("io.grpc:grpc-core:1.36.0")
20+
implementation("io.grpc:grpc-core:1.40.0")
2121
constraints {
2222
implementation("com.google.guava:guava:30.0-jre") {
2323
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")

0 commit comments

Comments
 (0)