Skip to content

Commit 36b472b

Browse files
fix: Create a pool of Channels for each target.
1 parent 61b1981 commit 36b472b

File tree

2 files changed

+59
-17
lines changed

2 files changed

+59
-17
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ChannelCache.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,27 @@
1717
package com.google.cloud.pubsublite.internal;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.flogger.GoogleLogger;
2021
import io.grpc.Channel;
2122
import io.grpc.ManagedChannel;
2223
import io.grpc.ManagedChannelBuilder;
24+
import java.util.Deque;
25+
import java.util.LinkedList;
2326
import java.util.concurrent.ConcurrentHashMap;
2427
import java.util.concurrent.TimeUnit;
2528
import java.util.function.Function;
2629

2730
/** A ChannelCache creates and stores default channels for use with api methods. */
2831
public class ChannelCache {
32+
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
33+
2934
private final Function<String, ManagedChannel> channelFactory;
30-
private final ConcurrentHashMap<String, ManagedChannel> channels = new ConcurrentHashMap<>();
35+
private final ConcurrentHashMap<String, Deque<ManagedChannel>> channels =
36+
new ConcurrentHashMap<>();
37+
38+
private static final int NUMBER_OF_CHANNELS_PER_TARGET = 10;
39+
private static final String NUMBER_OF_CHANNELS_PER_TARGET_VM_OVERRIDE =
40+
"google.cloud.pubsublite.channelCacheSize";
3141

3242
public ChannelCache() {
3343
this(ChannelCache::newChannel);
@@ -40,20 +50,45 @@ public ChannelCache() {
4050
}
4151

4252
@VisibleForTesting
43-
void onShutdown() {
53+
synchronized void onShutdown() {
4454
channels.forEachValue(
4555
channels.size(),
46-
channel -> {
56+
channels -> {
4757
try {
48-
channel.shutdownNow().awaitTermination(60, TimeUnit.SECONDS);
58+
for (ManagedChannel channel : channels) {
59+
channel.shutdownNow().awaitTermination(60, TimeUnit.SECONDS);
60+
}
4961
} catch (InterruptedException e) {
5062
e.printStackTrace();
5163
}
5264
});
5365
}
5466

55-
public Channel get(String target) {
56-
return channels.computeIfAbsent(target, channelFactory);
67+
public synchronized Channel get(String target) {
68+
Deque<ManagedChannel> channelQueue = channels.computeIfAbsent(target, this::newChannels);
69+
ManagedChannel channel = channelQueue.removeFirst();
70+
channelQueue.addLast(channel);
71+
return channel;
72+
}
73+
74+
private Deque<ManagedChannel> newChannels(String target) {
75+
int numberOfChannels = NUMBER_OF_CHANNELS_PER_TARGET;
76+
String numberOfChannelsOverride = System.getProperty(NUMBER_OF_CHANNELS_PER_TARGET_VM_OVERRIDE);
77+
if (numberOfChannelsOverride != null && !numberOfChannelsOverride.isEmpty()) {
78+
try {
79+
numberOfChannels = Integer.parseInt((numberOfChannelsOverride));
80+
} catch (NumberFormatException e) {
81+
log.atSevere().log(
82+
"Unable to parse override for number of channels per target: %s",
83+
numberOfChannelsOverride);
84+
}
85+
}
86+
87+
Deque<ManagedChannel> channels = new LinkedList<>();
88+
for (int i = 0; i < numberOfChannels; i++) {
89+
channels.add(channelFactory.apply(target));
90+
}
91+
return channels;
5792
}
5893

5994
private static ManagedChannel newChannel(String target) {

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/ChannelCacheTest.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package com.google.cloud.pubsublite.internal;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20-
import static org.mockito.ArgumentMatchers.any;
21-
import static org.mockito.Mockito.times;
22-
import static org.mockito.Mockito.verify;
20+
import static org.mockito.Mockito.mock;
2321
import static org.mockito.Mockito.when;
2422
import static org.mockito.MockitoAnnotations.initMocks;
2523

2624
import io.grpc.Channel;
2725
import io.grpc.ManagedChannel;
26+
import java.util.HashSet;
27+
import java.util.Set;
2828
import java.util.function.Function;
2929
import org.junit.Before;
3030
import org.junit.Test;
@@ -34,7 +34,6 @@
3434

3535
@RunWith(JUnit4.class)
3636
public class ChannelCacheTest {
37-
@Mock ManagedChannel mockChannel;
3837
@Mock Function<String, ManagedChannel> channelFactory;
3938

4039
@Before
@@ -44,14 +43,22 @@ public void setUp() {
4443

4544
@Test
4645
public void reusesChannels() {
47-
when(channelFactory.apply(any())).thenReturn(mockChannel);
46+
when(channelFactory.apply("abc"))
47+
.thenAnswer(
48+
(target) -> {
49+
ManagedChannel channel = mock(ManagedChannel.class);
50+
when(channel.shutdownNow()).thenReturn(channel);
51+
return channel;
52+
});
4853
ChannelCache cache = new ChannelCache(channelFactory);
49-
Channel chan1 = cache.get("abc");
50-
Channel chan2 = cache.get("abc");
51-
assertThat(chan1).isEqualTo(chan2);
52-
verify(channelFactory, times(1)).apply("abc");
53-
when(mockChannel.shutdownNow()).thenReturn(mockChannel);
54+
55+
// Only 10 Channels should be created.
56+
Set<Channel> channels = new HashSet<>();
57+
for (int i = 0; i < 20; i++) {
58+
channels.add(cache.get("abc"));
59+
}
60+
61+
assertThat(channels.size()).isEqualTo(10);
5462
cache.onShutdown();
55-
verify(mockChannel, times(1)).shutdownNow();
5663
}
5764
}

0 commit comments

Comments
 (0)