diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java index 60fb2d86d..8f188094b 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java @@ -22,12 +22,16 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.AlarmFactory; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory; import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; @AutoValue public abstract class SinglePartitionPublisherBuilder { + private static final Duration DEFAULT_UNLOAD_PERIOD = Duration.ofMinutes(5); + // Required parameters. abstract TopicPath topic(); @@ -37,12 +41,16 @@ public abstract class SinglePartitionPublisherBuilder { abstract BatchingSettings batchingSettings(); + // Optional parameters. + abstract Duration unloadPeriod(); + // For testing. abstract PublisherBuilder.Builder underlyingBuilder(); public static Builder newBuilder() { return new AutoValue_SinglePartitionPublisherBuilder.Builder() - .setUnderlyingBuilder(PublisherBuilder.builder()); + .setUnderlyingBuilder(PublisherBuilder.builder()) + .setUnloadPeriod(DEFAULT_UNLOAD_PERIOD); } @AutoValue.Builder @@ -57,6 +65,9 @@ public abstract static class Builder { public abstract Builder setBatchingSettings(BatchingSettings batchingSettings); + // Optional parameters. + public abstract Builder setUnloadPeriod(Duration unloadPeriod); + // For testing. @VisibleForTesting abstract Builder setUnderlyingBuilder(PublisherBuilder.Builder underlyingBuilder); @@ -72,7 +83,11 @@ public Publisher build() throws ApiException { .setPartition(builder.partition()) .setStreamFactory(builder.streamFactory()) .setBatching(builder.batchingSettings()); - return new SinglePartitionPublisher(publisherBuilder.build(), builder.partition()); + Partition partition = builder.partition(); + Duration unloadPeriod = builder.unloadPeriod(); + return new UnloadingPublisher( + () -> new SinglePartitionPublisher(publisherBuilder.build(), partition), + AlarmFactory.create(unloadPeriod)); } } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/UnloadingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/UnloadingPublisher.java new file mode 100644 index 000000000..11d504071 --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/UnloadingPublisher.java @@ -0,0 +1,117 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import com.google.api.core.AbstractApiService; +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.AlarmFactory; +import com.google.cloud.pubsublite.internal.Publisher; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import javax.annotation.concurrent.GuardedBy; + +/** A publisher which tears down connections when inactive. */ +class UnloadingPublisher extends AbstractApiService implements Publisher { + private final Supplier> supplier; + private final Future alarmFuture; + + @GuardedBy("this") + private Optional> publisher = Optional.empty(); + + @GuardedBy("this") + private boolean sawPublish = false; + + UnloadingPublisher(Supplier> supplier, AlarmFactory unloadAlarm) { + this.supplier = supplier; + this.alarmFuture = unloadAlarm.newAlarm(this::onUnloadAlarm); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected synchronized void doStop() { + alarmFuture.cancel(false); + if (!publisher.isPresent()) { + notifyStopped(); + return; + } + publisher + .get() + .addListener( + new Listener() { + @Override + public void terminated(State from) { + notifyStopped(); + } + }, + SystemExecutors.getFuturesExecutor()); + publisher.get().stopAsync(); + } + + @Override + public synchronized ApiFuture publish(Message message) { + sawPublish = true; + return getPublisher().publish(message); + } + + private synchronized Publisher getPublisher() { + if (!publisher.isPresent()) { + publisher = Optional.of(supplier.get()); + publisher + .get() + .addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + notifyFailed(failure); + } + }, + SystemExecutors.getFuturesExecutor()); + publisher.get().startAsync().awaitRunning(); + } + return publisher.get(); + } + + @Override + public synchronized void cancelOutstandingPublishes() { + if (publisher.isPresent()) { + publisher.get().cancelOutstandingPublishes(); + } + } + + @Override + public synchronized void flush() throws IOException { + if (publisher.isPresent()) { + publisher.get().flush(); + } + } + + private synchronized void onUnloadAlarm() { + if (publisher.isPresent() && !sawPublish) { + publisher.get().stopAsync().awaitTerminated(); + publisher = Optional.empty(); + } + sawPublish = false; + } +} diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java index ddbe91fb7..ae38e7fca 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java @@ -17,8 +17,6 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Mockito.RETURNS_SELF; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -26,25 +24,17 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.batching.BatchingSettings; -import com.google.cloud.pubsublite.CloudRegion; -import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.TopicName; -import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.testing.FakeApiService; -import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory; import com.google.protobuf.ByteString; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; import org.mockito.Spy; @RunWith(JUnit4.class) @@ -52,34 +42,14 @@ public class SinglePartitionPublisherTest { abstract static class FakeOffsetPublisher extends FakeApiService implements Publisher {} @Spy private FakeOffsetPublisher underlying; - @Mock private PublishStreamFactory streamFactory; private Publisher pub; @Before public void setUp() { initMocks(this); - - TopicPath topic = - TopicPath.newBuilder() - .setName(TopicName.of("abc")) - .setProject(ProjectNumber.of(123)) - .setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a')) - .build(); Partition partition = Partition.of(3); - - PublisherBuilder.Builder mockBuilder = mock(PublisherBuilder.Builder.class, RETURNS_SELF); - when(mockBuilder.build()).thenReturn(underlying); - - when(mockBuilder.setTopic(topic)).thenReturn(mockBuilder); - this.pub = - SinglePartitionPublisherBuilder.newBuilder() - .setTopic(topic) - .setPartition(partition) - .setUnderlyingBuilder(mockBuilder) - .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()) - .setStreamFactory(streamFactory) - .build(); + this.pub = new SinglePartitionPublisher(underlying, partition); this.pub.startAsync().awaitRunning(); } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/UnloadingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/UnloadingPublisherTest.java new file mode 100644 index 000000000..7ee1deabb --- /dev/null +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/UnloadingPublisherTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenFailed; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.AlarmFactory; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import com.google.protobuf.ByteString; +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.quality.Strictness; + +@RunWith(JUnit4.class) +public class UnloadingPublisherTest { + private static final Message MESSAGE = + Message.builder().setData(ByteString.copyFromUtf8("abc")).build(); + + @Rule public MockitoRule mockito = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); + + @Mock private Supplier> supplier; + + abstract static class FakePublisher extends FakeApiService + implements Publisher {}; + + @Spy private FakePublisher underlying1; + @Spy private FakePublisher underlying2; + + @Mock private AlarmFactory alarmFactory; + private Runnable onAlarm; + + private UnloadingPublisher publisher; + + @Before + public void setUp() { + when(alarmFactory.newAlarm(any())) + .thenAnswer( + args -> { + onAlarm = args.getArgument(0); + return SettableApiFuture.create(); + }); + publisher = new UnloadingPublisher(supplier, alarmFactory); + Objects.requireNonNull(onAlarm); + publisher.startAsync().awaitRunning(); + } + + @Test + public void loadOnPublish() { + doReturn(underlying1).when(supplier).get(); + SettableApiFuture future = SettableApiFuture.create(); + doReturn(future).when(underlying1).publish(MESSAGE); + + assertThat(publisher.publish(MESSAGE)).isSameInstanceAs(future); + } + + @Test + public void underlyingFailureFails() throws Exception { + doReturn(underlying1).when(supplier).get(); + doReturn(SettableApiFuture.create()).when(underlying1).publish(MESSAGE); + ApiFuture unused = publisher.publish(MESSAGE); + + Future failed = whenFailed(publisher); + underlying1.fail(new IllegalStateException("bad")); + failed.get(); + } + + @Test + public void twoAlarmsWithoutPublishShutsDown() throws Exception { + doReturn(underlying1).when(supplier).get(); + doReturn(SettableApiFuture.create()).when(underlying1).publish(MESSAGE); + ApiFuture future1 = publisher.publish(MESSAGE); + + onAlarm.run(); + + assertThat(underlying1.isRunning()).isTrue(); + + ApiFuture future2 = publisher.publish(MESSAGE); + onAlarm.run(); + assertThat(underlying1.isRunning()).isTrue(); + + onAlarm.run(); + assertThat(underlying1.isRunning()).isFalse(); + } +}