Skip to content

Commit 502484a

Browse files
fix: Implement ChannelCache, which aggregates stubs to a single channel, and properly cleans them up on teardown. (#72)
1 parent 603b4a7 commit 502484a

File tree

2 files changed

+56
-2
lines changed

2 files changed

+56
-2
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,21 @@
1717
package com.google.cloud.pubsublite;
1818

1919
import com.google.auth.oauth2.GoogleCredentials;
20+
import com.google.cloud.pubsublite.internal.ChannelCache;
2021
import com.google.common.collect.ImmutableList;
2122
import io.grpc.Channel;
22-
import io.grpc.ManagedChannelBuilder;
2323
import io.grpc.auth.MoreCallCredentials;
2424
import io.grpc.stub.AbstractStub;
2525
import java.io.IOException;
2626
import java.util.function.Function;
2727

2828
public class Stubs {
29+
private static final ChannelCache channels = new ChannelCache();
30+
2931
public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
3032
String target, Function<Channel, StubT> stubFactory) throws IOException {
3133
return stubFactory
32-
.apply(ManagedChannelBuilder.forTarget(target).build())
34+
.apply(channels.get(target))
3335
.withCallCredentials(
3436
MoreCallCredentials.from(
3537
GoogleCredentials.getApplicationDefault()
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal;
18+
19+
import io.grpc.Channel;
20+
import io.grpc.ManagedChannel;
21+
import io.grpc.ManagedChannelBuilder;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.TimeUnit;
24+
25+
/** A ChannelCache creates and stores default channels for use with api methods. */
26+
public class ChannelCache {
27+
private final ConcurrentHashMap<String, ManagedChannel> channels = new ConcurrentHashMap<>();
28+
29+
public ChannelCache() {
30+
Runtime.getRuntime().addShutdownHook(new Thread(this::onShutdown));
31+
}
32+
33+
private void onShutdown() {
34+
channels.forEachValue(
35+
channels.size(),
36+
channel -> {
37+
try {
38+
channel.shutdownNow().awaitTermination(60, TimeUnit.SECONDS);
39+
} catch (InterruptedException e) {
40+
e.printStackTrace();
41+
}
42+
});
43+
}
44+
45+
public Channel get(String target) {
46+
return channels.computeIfAbsent(target, this::newChannel);
47+
}
48+
49+
private ManagedChannel newChannel(String target) {
50+
return ManagedChannelBuilder.forTarget(target).build();
51+
}
52+
}

0 commit comments

Comments
 (0)