Skip to content

Commit cb552a0

Browse files
refactor: work in deadlines (#21)
BREAKING CHANGE: changed new apis added in last release before they're adopted
1 parent 3a96620 commit cb552a0

File tree

6 files changed

+67
-77
lines changed

6 files changed

+67
-77
lines changed

grpc-client-utils/build.gradle.kts

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,11 @@ plugins {
66
}
77

88
dependencies {
9-
implementation(project(":grpc-context-utils"))
9+
api("io.grpc:grpc-context:1.40.0")
10+
api("io.grpc:grpc-api:1.40.0")
1011

11-
// Logging
12+
implementation(project(":grpc-context-utils"))
1213
implementation("org.slf4j:slf4j-api:1.7.30")
13-
// End Logging
14-
15-
// grpc
16-
implementation("io.grpc:grpc-core:1.40.0")
17-
constraints {
18-
implementation("com.google.guava:guava:30.0-jre") {
19-
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
20-
}
21-
}
2214

2315
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
2416
testImplementation("org.mockito:mockito-core:3.12.1")

grpc-client-utils/src/main/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistry.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package org.hypertrace.core.grpcutils.client;
22

3-
import static java.time.temporal.ChronoUnit.MINUTES;
4-
import static java.time.temporal.ChronoUnit.SECONDS;
3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
54

5+
import io.grpc.Deadline;
66
import io.grpc.ManagedChannel;
77
import io.grpc.ManagedChannelBuilder;
8-
import java.time.Clock;
9-
import java.time.Instant;
108
import java.util.Map;
119
import java.util.concurrent.ConcurrentHashMap;
1210
import java.util.concurrent.TimeUnit;
@@ -16,18 +14,8 @@
1614
public class GrpcChannelRegistry {
1715
private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class);
1816
private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
19-
private final Clock clock;
2017
private volatile boolean isShutdown = false;
2118

22-
@Deprecated
23-
public GrpcChannelRegistry() {
24-
this(Clock.systemUTC());
25-
}
26-
27-
public GrpcChannelRegistry(Clock clock) {
28-
this.clock = clock;
29-
}
30-
3119
/**
3220
* Use either {@link #forSecureAddress(String, int)} or {@link #forPlaintextAddress(String, int)}
3321
*/
@@ -68,10 +56,10 @@ private String getChannelId(String host, int port, boolean isPlaintext) {
6856
/**
6957
* Shuts down channels using a default deadline of 1 minute.
7058
*
71-
* @see #shutdown(Instant)
59+
* @see #shutdown(Deadline)
7260
*/
7361
public void shutdown() {
74-
this.shutdown(this.clock.instant().plus(1, MINUTES));
62+
this.shutdown(Deadline.after(1, TimeUnit.MINUTES));
7563
}
7664

7765
/**
@@ -96,11 +84,11 @@ public void shutdown() {
9684
*
9785
* @param deadline Deadline for all channels to complete graceful shutdown.
9886
*/
99-
public void shutdown(Instant deadline) {
87+
public void shutdown(Deadline deadline) {
10088
channelMap.forEach(this::initiateChannelShutdown);
10189
channelMap.keySet().stream()
10290
.filter(channelId -> !this.waitForGracefulShutdown(channelId, deadline))
103-
.forEach(this::forceShutdown);
91+
.forEach(channelId -> this.forceShutdown(channelId, deadline.offset(5, TimeUnit.SECONDS)));
10492

10593
this.isShutdown = true;
10694
this.channelMap.clear();
@@ -111,30 +99,28 @@ private void initiateChannelShutdown(String channelId, ManagedChannel managedCha
11199
managedChannel.shutdown();
112100
}
113101

114-
private boolean waitForGracefulShutdown(String channelId, Instant deadline) {
102+
private boolean waitForGracefulShutdown(String channelId, Deadline deadline) {
115103
boolean successfullyShutdown = this.waitForTermination(channelId, deadline);
116104
if (successfullyShutdown) {
117105
LOG.info("Shutdown channel successfully [{}]", channelId);
118106
}
119107
return successfullyShutdown;
120108
}
121109

122-
private void forceShutdown(String channelId) {
110+
private void forceShutdown(String channelId, Deadline deadline) {
123111
LOG.error("Shutting down channel [{}] forcefully", channelId);
124112
this.channelMap.get(channelId).shutdownNow();
125-
Instant forceShutdownDeadline = this.clock.instant().plus(5, SECONDS);
126-
if (this.waitForTermination(channelId, forceShutdownDeadline)) {
113+
if (this.waitForTermination(channelId, deadline)) {
127114
LOG.error("Forced channel [{}] shutdown successful", channelId);
128115
} else {
129116
LOG.error("Unable to force channel [{}] shutdown in 5s - giving up!", channelId);
130117
}
131118
}
132119

133-
private boolean waitForTermination(String channelId, Instant deadline) {
120+
private boolean waitForTermination(String channelId, Deadline deadline) {
134121
ManagedChannel managedChannel = this.channelMap.get(channelId);
135-
long millisRemaining = Math.max(0, deadline.toEpochMilli() - this.clock.millis());
136122
try {
137-
if (!managedChannel.awaitTermination(millisRemaining, TimeUnit.MILLISECONDS)) {
123+
if (!managedChannel.awaitTermination(deadline.timeRemaining(MILLISECONDS), MILLISECONDS)) {
138124
LOG.error("Channel [{}] did not shut down after waiting", channelId);
139125
}
140126
} catch (InterruptedException ex) {

grpc-client-utils/src/test/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistryTest.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
import static org.mockito.Mockito.when;
1313

1414
import io.grpc.Channel;
15+
import io.grpc.Deadline;
16+
import io.grpc.Deadline.Ticker;
1517
import io.grpc.ManagedChannel;
1618
import io.grpc.ManagedChannelBuilder;
17-
import java.time.Clock;
18-
import java.time.Instant;
19-
import java.time.ZoneOffset;
2019
import java.util.concurrent.TimeUnit;
2120
import org.junit.jupiter.api.BeforeEach;
2221
import org.junit.jupiter.api.Test;
@@ -30,7 +29,7 @@ class GrpcChannelRegistryTest {
3029

3130
@BeforeEach
3231
void beforeEach() {
33-
this.channelRegistry = new GrpcChannelRegistry(Clock.systemUTC());
32+
this.channelRegistry = new GrpcChannelRegistry();
3433
}
3534

3635
@Test
@@ -51,10 +50,10 @@ void reusesChannelsForDuplicateRequests() {
5150
assertNotSame(firstChannelSecure, this.channelRegistry.forSecureAddress("bar", 1000));
5251
}
5352

53+
@SuppressWarnings("rawtypes")
5454
@Test
5555
void shutdownAllChannelsOnShutdown() throws InterruptedException {
56-
this.channelRegistry =
57-
new GrpcChannelRegistry(Clock.fixed(Instant.ofEpochMilli(0), ZoneOffset.UTC));
56+
this.channelRegistry = new GrpcChannelRegistry();
5857
try (MockedStatic<ManagedChannelBuilder> mockedBuilderStatic =
5958
Mockito.mockStatic(ManagedChannelBuilder.class)) {
6059

@@ -78,8 +77,12 @@ void shutdownAllChannelsOnShutdown() throws InterruptedException {
7877
// Second does not
7978
when(secondChannel.isTerminated()).thenReturn(false);
8079

80+
Ticker mockTicker = mock(Ticker.class);
81+
82+
when(mockTicker.nanoTime()).thenReturn(0L);
83+
8184
// Wait for 10ms (test clock fixed at 0)
82-
this.channelRegistry.shutdown(Instant.ofEpochMilli(10));
85+
this.channelRegistry.shutdown(Deadline.after(10, TimeUnit.MILLISECONDS, mockTicker));
8386

8487
// First channel requests shutdown, waits, succeeds and checks result
8588
InOrder firstChannelVerifier = inOrder(firstChannel);
@@ -95,8 +98,8 @@ void shutdownAllChannelsOnShutdown() throws InterruptedException {
9598
secondChannelVerifier.verify(secondChannel).awaitTermination(10, TimeUnit.MILLISECONDS);
9699
secondChannelVerifier.verify(secondChannel).isTerminated();
97100
secondChannelVerifier.verify(secondChannel).shutdownNow();
98-
// hardcoded 5s for force shutdown
99-
secondChannelVerifier.verify(secondChannel).awaitTermination(5000, TimeUnit.MILLISECONDS);
101+
// hardcoded additional 5s for force shutdown
102+
secondChannelVerifier.verify(secondChannel).awaitTermination(5010, TimeUnit.MILLISECONDS);
100103
secondChannelVerifier.verify(secondChannel).isTerminated();
101104
}
102105
}

grpc-server-utils/build.gradle.kts

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,11 @@ tasks.test {
1010
}
1111

1212
dependencies {
13-
implementation(project(":grpc-context-utils"))
13+
api("io.grpc:grpc-context:1.40.0")
14+
api("io.grpc:grpc-api:1.40.0")
1415

15-
// Logging
16+
implementation(project(":grpc-context-utils"))
1617
implementation("org.slf4j:slf4j-api:1.7.30")
17-
// End Logging
18-
19-
// grpc
20-
implementation("io.grpc:grpc-core:1.40.0")
21-
constraints {
22-
implementation("com.google.guava:guava:30.0-jre") {
23-
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
24-
}
25-
}
2618

2719
annotationProcessor("org.projectlombok:lombok:1.18.20")
2820
compileOnly("org.projectlombok:lombok:1.18.20")

grpc-server-utils/src/main/java/org/hypertrace/core/grpcutils/server/ServerManagementUtil.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,56 @@
11
package org.hypertrace.core.grpcutils.server;
22

3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4+
import static java.util.concurrent.TimeUnit.SECONDS;
5+
6+
import io.grpc.Deadline;
37
import io.grpc.Server;
4-
import java.time.Duration;
5-
import java.util.concurrent.TimeUnit;
68
import lombok.extern.slf4j.Slf4j;
79

810
@Slf4j
911
public class ServerManagementUtil {
1012

11-
public static void shutdownServer(Server grpcServer, String name, Duration timeout) {
13+
/**
14+
* Attempts to gracefully shutdown the server by the provided deadline. If unsuccessful, it then
15+
* uses up to 5 additional seconds to forcefully shutdown the server, returning with the final
16+
* success status.
17+
*
18+
* @param grpcServer
19+
* @param name
20+
* @param deadline
21+
* @return boolean - true if the server is terminated successfully, false otherwise
22+
*/
23+
public static boolean shutdownServer(Server grpcServer, String name, Deadline deadline) {
1224
log.info("Starting shutdown for service [{}]", name);
1325
grpcServer.shutdown();
14-
boolean gracefullyShutdown = waitForGracefulShutdown(grpcServer, name, timeout);
15-
if (!gracefullyShutdown) {
16-
forceShutdown(grpcServer, name);
17-
}
26+
return waitForGracefulShutdown(grpcServer, name, deadline)
27+
|| forceShutdown(grpcServer, name, deadline.offset(5, SECONDS));
1828
}
1929

20-
private static boolean waitForGracefulShutdown(Server grpcServer, String name, Duration timeout) {
21-
boolean successfullyShutdown = waitForTermination(grpcServer, name, timeout);
30+
private static boolean waitForGracefulShutdown(
31+
Server grpcServer, String name, Deadline deadline) {
32+
boolean successfullyShutdown = waitForTermination(grpcServer, name, deadline);
2233
if (successfullyShutdown) {
2334
log.info("Shutdown service successfully [{}]", name);
2435
}
2536
return successfullyShutdown;
2637
}
2738

28-
private static void forceShutdown(Server grpcServer, String name) {
39+
private static boolean forceShutdown(Server grpcServer, String name, Deadline deadline) {
2940
log.error("Shutting down service [{}] forcefully", name);
3041
grpcServer.shutdownNow();
31-
if (waitForTermination(grpcServer, name, Duration.ofSeconds(5))) {
42+
boolean successfullyShutdown = waitForTermination(grpcServer, name, deadline);
43+
if (successfullyShutdown) {
3244
log.error("Forced service [{}] shutdown successful", name);
3345
} else {
3446
log.error("Unable to force service [{}] shutdown in 5s - giving up!", name);
3547
}
48+
return successfullyShutdown;
3649
}
3750

38-
private static boolean waitForTermination(Server grpcServer, String name, Duration deadline) {
51+
private static boolean waitForTermination(Server grpcServer, String name, Deadline deadline) {
3952
try {
40-
if (!grpcServer.awaitTermination(deadline.toMillis(), TimeUnit.MILLISECONDS)) {
53+
if (!grpcServer.awaitTermination(deadline.timeRemaining(MILLISECONDS), MILLISECONDS)) {
4154
log.error("Service [{}] did not shut down after waiting", name);
4255
}
4356
} catch (InterruptedException ex) {

grpc-server-utils/src/test/java/org/hypertrace/core/grpcutils/server/ServerManagementUtilTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package org.hypertrace.core.grpcutils.server;
22

3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
34
import static org.mockito.Mockito.inOrder;
45
import static org.mockito.Mockito.mock;
56
import static org.mockito.Mockito.when;
67

8+
import io.grpc.Deadline;
9+
import io.grpc.Deadline.Ticker;
710
import io.grpc.Server;
8-
import java.time.Duration;
9-
import java.time.temporal.ChronoUnit;
10-
import java.util.concurrent.TimeUnit;
1111
import org.junit.jupiter.api.Test;
1212
import org.mockito.InOrder;
1313

@@ -17,12 +17,14 @@ class ServerManagementUtilTest {
1717
void canShutdownGracefully() throws InterruptedException {
1818
Server mockServer = mock(Server.class);
1919
when(mockServer.isTerminated()).thenReturn(true);
20+
Ticker mockTicker = mock(Ticker.class);
21+
when(mockTicker.nanoTime()).thenReturn(0L);
2022
ServerManagementUtil.shutdownServer(
21-
mockServer, "mockServer", Duration.of(10, ChronoUnit.MILLIS));
23+
mockServer, "mockServer", Deadline.after(10, MILLISECONDS, mockTicker));
2224

2325
InOrder serverVerifier = inOrder(mockServer);
2426
serverVerifier.verify(mockServer).shutdown();
25-
serverVerifier.verify(mockServer).awaitTermination(10, TimeUnit.MILLISECONDS);
27+
serverVerifier.verify(mockServer).awaitTermination(10, MILLISECONDS);
2628
serverVerifier.verify(mockServer).isTerminated();
2729
serverVerifier.verifyNoMoreInteractions();
2830
}
@@ -31,15 +33,17 @@ void canShutdownGracefully() throws InterruptedException {
3133
void canShutdownForcefully() throws InterruptedException {
3234
Server mockServer = mock(Server.class);
3335
when(mockServer.isTerminated()).thenReturn(false);
36+
Ticker mockTicker = mock(Ticker.class);
37+
when(mockTicker.nanoTime()).thenReturn(0L);
3438
ServerManagementUtil.shutdownServer(
35-
mockServer, "mockServer", Duration.of(10, ChronoUnit.MILLIS));
39+
mockServer, "mockServer", Deadline.after(10, MILLISECONDS, mockTicker));
3640

3741
InOrder serverVerifier = inOrder(mockServer);
3842
serverVerifier.verify(mockServer).shutdown();
39-
serverVerifier.verify(mockServer).awaitTermination(10, TimeUnit.MILLISECONDS);
43+
serverVerifier.verify(mockServer).awaitTermination(10, MILLISECONDS);
4044
serverVerifier.verify(mockServer).isTerminated();
4145
serverVerifier.verify(mockServer).shutdownNow();
42-
serverVerifier.verify(mockServer).awaitTermination(5000, TimeUnit.MILLISECONDS);
46+
serverVerifier.verify(mockServer).awaitTermination(5010, MILLISECONDS);
4347
serverVerifier.verify(mockServer).isTerminated();
4448
serverVerifier.verifyNoMoreInteractions();
4549
}

0 commit comments

Comments
 (0)