Skip to content

Commit 2429c76

Browse files
feat: unload partitions when not publishing for better performance
1 parent 47e449c commit 2429c76

File tree

4 files changed

+251
-33
lines changed

4 files changed

+251
-33
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
import com.google.cloud.pubsublite.MessageMetadata;
2323
import com.google.cloud.pubsublite.Partition;
2424
import com.google.cloud.pubsublite.TopicPath;
25+
import com.google.cloud.pubsublite.internal.AlarmFactory;
2526
import com.google.cloud.pubsublite.internal.Publisher;
2627
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
2728
import com.google.common.annotations.VisibleForTesting;
29+
import java.time.Duration;
2830

2931
@AutoValue
3032
public abstract class SinglePartitionPublisherBuilder {
33+
private static final Duration DEFAULT_UNLOAD_PERIOD = Duration.ofMinutes(5);
34+
3135
// Required parameters.
3236
abstract TopicPath topic();
3337

@@ -37,12 +41,16 @@ public abstract class SinglePartitionPublisherBuilder {
3741

3842
abstract BatchingSettings batchingSettings();
3943

44+
// Optional parameters.
45+
abstract Duration unloadPeriod();
46+
4047
// For testing.
4148
abstract PublisherBuilder.Builder underlyingBuilder();
4249

4350
public static Builder newBuilder() {
4451
return new AutoValue_SinglePartitionPublisherBuilder.Builder()
45-
.setUnderlyingBuilder(PublisherBuilder.builder());
52+
.setUnderlyingBuilder(PublisherBuilder.builder())
53+
.setUnloadPeriod(DEFAULT_UNLOAD_PERIOD);
4654
}
4755

4856
@AutoValue.Builder
@@ -57,6 +65,9 @@ public abstract static class Builder {
5765

5866
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);
5967

68+
// Optional parameters.
69+
public abstract Builder setUnloadPeriod(Duration unloadPeriod);
70+
6071
// For testing.
6172
@VisibleForTesting
6273
abstract Builder setUnderlyingBuilder(PublisherBuilder.Builder underlyingBuilder);
@@ -72,7 +83,11 @@ public Publisher<MessageMetadata> build() throws ApiException {
7283
.setPartition(builder.partition())
7384
.setStreamFactory(builder.streamFactory())
7485
.setBatching(builder.batchingSettings());
75-
return new SinglePartitionPublisher(publisherBuilder.build(), builder.partition());
86+
Partition partition = builder.partition();
87+
Duration unloadPeriod = builder.unloadPeriod();
88+
return new UnloadingPublisher(
89+
() -> new SinglePartitionPublisher(publisherBuilder.build(), partition),
90+
AlarmFactory.create(unloadPeriod));
7691
}
7792
}
7893
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import com.google.api.core.AbstractApiService;
20+
import com.google.api.core.ApiFuture;
21+
import com.google.cloud.pubsublite.Message;
22+
import com.google.cloud.pubsublite.MessageMetadata;
23+
import com.google.cloud.pubsublite.internal.AlarmFactory;
24+
import com.google.cloud.pubsublite.internal.Publisher;
25+
import java.io.IOException;
26+
import java.util.Optional;
27+
import java.util.concurrent.Future;
28+
import java.util.function.Supplier;
29+
import javax.annotation.concurrent.GuardedBy;
30+
31+
/** A publisher which tears down connections when inactive. */
32+
class UnloadingPublisher extends AbstractApiService implements Publisher<MessageMetadata> {
33+
private final Supplier<Publisher<MessageMetadata>> supplier;
34+
private final Future<?> alarmFuture;
35+
36+
@GuardedBy("this")
37+
private Optional<Publisher<MessageMetadata>> publisher = Optional.empty();
38+
39+
@GuardedBy("this")
40+
private boolean sawPublish = false;
41+
42+
UnloadingPublisher(Supplier<Publisher<MessageMetadata>> supplier, AlarmFactory unloadAlarm) {
43+
this.supplier = supplier;
44+
this.alarmFuture = unloadAlarm.newAlarm(this::onUnloadAlarm);
45+
}
46+
47+
@Override
48+
protected void doStart() {
49+
notifyStarted();
50+
}
51+
52+
@Override
53+
protected synchronized void doStop() {
54+
alarmFuture.cancel(false);
55+
if (!publisher.isPresent()) {
56+
notifyStopped();
57+
}
58+
publisher
59+
.get()
60+
.addListener(
61+
new Listener() {
62+
@Override
63+
public void terminated(State from) {
64+
notifyStopped();
65+
}
66+
},
67+
SystemExecutors.getFuturesExecutor());
68+
publisher.get().stopAsync();
69+
}
70+
71+
@Override
72+
public synchronized ApiFuture<MessageMetadata> publish(Message message) {
73+
sawPublish = true;
74+
return getPublisher().publish(message);
75+
}
76+
77+
private synchronized Publisher<MessageMetadata> getPublisher() {
78+
if (!publisher.isPresent()) {
79+
publisher = Optional.of(supplier.get());
80+
publisher
81+
.get()
82+
.addListener(
83+
new Listener() {
84+
@Override
85+
public void failed(State from, Throwable failure) {
86+
notifyFailed(failure);
87+
}
88+
},
89+
SystemExecutors.getFuturesExecutor());
90+
publisher.get().startAsync().awaitRunning();
91+
}
92+
return publisher.get();
93+
}
94+
95+
@Override
96+
public synchronized void cancelOutstandingPublishes() {
97+
if (publisher.isPresent()) {
98+
publisher.get().cancelOutstandingPublishes();
99+
}
100+
}
101+
102+
@Override
103+
public synchronized void flush() throws IOException {
104+
if (publisher.isPresent()) {
105+
publisher.get().flush();
106+
}
107+
}
108+
109+
private synchronized void onUnloadAlarm() {
110+
if (publisher.isPresent() && !sawPublish) {
111+
publisher.get().stopAsync().awaitTerminated();
112+
publisher = Optional.empty();
113+
}
114+
sawPublish = false;
115+
}
116+
}

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,69 +17,39 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20-
import static org.mockito.Mockito.RETURNS_SELF;
21-
import static org.mockito.Mockito.mock;
2220
import static org.mockito.Mockito.times;
2321
import static org.mockito.Mockito.verify;
2422
import static org.mockito.Mockito.when;
2523
import static org.mockito.MockitoAnnotations.initMocks;
2624

2725
import com.google.api.core.ApiFuture;
2826
import com.google.api.core.SettableApiFuture;
29-
import com.google.api.gax.batching.BatchingSettings;
30-
import com.google.cloud.pubsublite.CloudRegion;
31-
import com.google.cloud.pubsublite.CloudZone;
3227
import com.google.cloud.pubsublite.Message;
3328
import com.google.cloud.pubsublite.MessageMetadata;
3429
import com.google.cloud.pubsublite.Offset;
3530
import com.google.cloud.pubsublite.Partition;
36-
import com.google.cloud.pubsublite.ProjectNumber;
37-
import com.google.cloud.pubsublite.TopicName;
38-
import com.google.cloud.pubsublite.TopicPath;
3931
import com.google.cloud.pubsublite.internal.Publisher;
4032
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
41-
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
4233
import com.google.protobuf.ByteString;
4334
import org.junit.Before;
4435
import org.junit.Test;
4536
import org.junit.runner.RunWith;
4637
import org.junit.runners.JUnit4;
47-
import org.mockito.Mock;
4838
import org.mockito.Spy;
4939

5040
@RunWith(JUnit4.class)
5141
public class SinglePartitionPublisherTest {
5242
abstract static class FakeOffsetPublisher extends FakeApiService implements Publisher<Offset> {}
5343

5444
@Spy private FakeOffsetPublisher underlying;
55-
@Mock private PublishStreamFactory streamFactory;
5645

5746
private Publisher<MessageMetadata> pub;
5847

5948
@Before
6049
public void setUp() {
6150
initMocks(this);
62-
63-
TopicPath topic =
64-
TopicPath.newBuilder()
65-
.setName(TopicName.of("abc"))
66-
.setProject(ProjectNumber.of(123))
67-
.setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
68-
.build();
6951
Partition partition = Partition.of(3);
70-
71-
PublisherBuilder.Builder mockBuilder = mock(PublisherBuilder.Builder.class, RETURNS_SELF);
72-
when(mockBuilder.build()).thenReturn(underlying);
73-
74-
when(mockBuilder.setTopic(topic)).thenReturn(mockBuilder);
75-
this.pub =
76-
SinglePartitionPublisherBuilder.newBuilder()
77-
.setTopic(topic)
78-
.setPartition(partition)
79-
.setUnderlyingBuilder(mockBuilder)
80-
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build())
81-
.setStreamFactory(streamFactory)
82-
.build();
52+
this.pub = new SinglePartitionPublisher(underlying, partition);
8353
this.pub.startAsync().awaitRunning();
8454
}
8555

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenFailed;
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doReturn;
23+
import static org.mockito.Mockito.when;
24+
25+
import com.google.api.core.ApiFuture;
26+
import com.google.api.core.SettableApiFuture;
27+
import com.google.cloud.pubsublite.Message;
28+
import com.google.cloud.pubsublite.MessageMetadata;
29+
import com.google.cloud.pubsublite.internal.AlarmFactory;
30+
import com.google.cloud.pubsublite.internal.Publisher;
31+
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
32+
import com.google.protobuf.ByteString;
33+
import java.util.Objects;
34+
import java.util.concurrent.Future;
35+
import java.util.function.Supplier;
36+
import org.junit.Before;
37+
import org.junit.Rule;
38+
import org.junit.Test;
39+
import org.junit.runner.RunWith;
40+
import org.junit.runners.JUnit4;
41+
import org.mockito.Mock;
42+
import org.mockito.Spy;
43+
import org.mockito.junit.MockitoJUnit;
44+
import org.mockito.junit.MockitoRule;
45+
import org.mockito.quality.Strictness;
46+
47+
@RunWith(JUnit4.class)
48+
public class UnloadingPublisherTest {
49+
private static final Message MESSAGE =
50+
Message.builder().setData(ByteString.copyFromUtf8("abc")).build();
51+
52+
@Rule public MockitoRule mockito = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
53+
54+
@Mock private Supplier<Publisher<MessageMetadata>> supplier;
55+
56+
abstract static class FakePublisher extends FakeApiService
57+
implements Publisher<MessageMetadata> {};
58+
59+
@Spy private FakePublisher underlying1;
60+
@Spy private FakePublisher underlying2;
61+
62+
@Mock private AlarmFactory alarmFactory;
63+
private Runnable onAlarm;
64+
65+
private UnloadingPublisher publisher;
66+
67+
@Before
68+
public void setUp() {
69+
when(alarmFactory.newAlarm(any()))
70+
.thenAnswer(
71+
args -> {
72+
onAlarm = args.getArgument(0);
73+
return SettableApiFuture.create();
74+
});
75+
publisher = new UnloadingPublisher(supplier, alarmFactory);
76+
Objects.requireNonNull(onAlarm);
77+
publisher.startAsync().awaitRunning();
78+
}
79+
80+
@Test
81+
public void loadOnPublish() {
82+
doReturn(underlying1).when(supplier).get();
83+
SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
84+
doReturn(future).when(underlying1).publish(MESSAGE);
85+
86+
assertThat(publisher.publish(MESSAGE)).isSameInstanceAs(future);
87+
}
88+
89+
@Test
90+
public void underlyingFailureFails() throws Exception {
91+
doReturn(underlying1).when(supplier).get();
92+
doReturn(SettableApiFuture.create()).when(underlying1).publish(MESSAGE);
93+
ApiFuture<?> unused = publisher.publish(MESSAGE);
94+
95+
Future<Void> failed = whenFailed(publisher);
96+
underlying1.fail(new IllegalStateException("bad"));
97+
failed.get();
98+
}
99+
100+
@Test
101+
public void twoAlarmsWithoutPublishShutsDown() throws Exception {
102+
doReturn(underlying1).when(supplier).get();
103+
doReturn(SettableApiFuture.create()).when(underlying1).publish(MESSAGE);
104+
ApiFuture<?> future1 = publisher.publish(MESSAGE);
105+
106+
onAlarm.run();
107+
108+
assertThat(underlying1.isRunning()).isTrue();
109+
110+
ApiFuture<?> future2 = publisher.publish(MESSAGE);
111+
onAlarm.run();
112+
assertThat(underlying1.isRunning()).isTrue();
113+
114+
onAlarm.run();
115+
assertThat(underlying1.isRunning()).isFalse();
116+
}
117+
}

0 commit comments

Comments
 (0)