Skip to content

Commit 74d61fd

Browse files
authored
fix: Return an error on publish if the publisher's state is not RUNNING (#81)
* Throw an exception on publish if the publisher state is not RUNNING * Immediately return failed future * Moved check to PublisherImpl * Tests for PublisherImplTest * Throw a permanent error when publishing before starting the service * Remove comment * Check for permanent failure by trying to start publisher
1 parent a56858e commit 74d61fd

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23+
import com.google.api.core.ApiService;
2324
import com.google.api.core.SettableApiFuture;
2425
import com.google.api.gax.batching.BatchingSettings;
2526
import com.google.cloud.pubsublite.Constants;
@@ -209,12 +210,11 @@ public ApiFuture<Offset> publish(Message message) {
209210
return ApiFutures.immediateFailedFuture(error.asException());
210211
}
211212
try (CloseableMonitor.Hold h = monitor.enter()) {
212-
if (shutdown) {
213-
return ApiFutures.immediateFailedFuture(
214-
Status.FAILED_PRECONDITION
215-
.withDescription("Published after the stream shut down.")
216-
.asException());
217-
}
213+
ApiService.State currentState = state();
214+
checkState(
215+
currentState == ApiService.State.RUNNING,
216+
String.format("Cannot publish when Publisher state is %s.", currentState.name()));
217+
checkState(!shutdown, "Published after the stream shut down.");
218218
ApiFuture<Offset> messageFuture = batcher.add(proto);
219219
if (batcher.shouldFlush()) {
220220
processBatch(batcher.flush());

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,20 +116,25 @@ public void setUp() throws StatusException {
116116
INITIAL_PUBLISH_REQUEST.getInitialRequest(),
117117
BATCHING_SETTINGS_THAT_NEVER_FIRE);
118118
publisher.addListener(permanentErrorHandler, MoreExecutors.directExecutor());
119+
}
120+
121+
private void startPublisher() {
119122
publisher.startAsync().awaitRunning();
123+
120124
assertThat(leakedOffsetStream).isNotNull();
125+
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
121126
}
122127

123128
@Test
124129
public void construct_CallsFactoryNew() {
125-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
130+
startPublisher();
126131
verifyNoMoreInteractions(mockPublisherFactory);
127132
verifyZeroInteractions(mockBatchPublisher);
128133
}
129134

130135
@Test
131136
public void construct_FlushSendsBatched() throws Exception {
132-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
137+
startPublisher();
133138
Message message = Message.builder().build();
134139
Future<Offset> future = publisher.publish(message);
135140

@@ -151,7 +156,7 @@ public void construct_FlushSendsBatched() throws Exception {
151156

152157
@Test
153158
public void construct_CloseSendsBatched() throws Exception {
154-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
159+
startPublisher();
155160
Message message = Message.builder().build();
156161
Future<Offset> future = publisher.publish(message);
157162

@@ -172,9 +177,18 @@ public void construct_CloseSendsBatched() throws Exception {
172177
verifyNoMoreInteractions(mockBatchPublisher);
173178
}
174179

180+
@Test
181+
public void publishBeforeStart_IsPermanentError() throws Exception {
182+
Message message = Message.builder().build();
183+
assertThrows(IllegalStateException.class, () -> publisher.publish(message));
184+
assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning());
185+
verifyZeroInteractions(mockPublisherFactory);
186+
verifyZeroInteractions(mockBatchPublisher);
187+
}
188+
175189
@Test
176190
public void publishAfterError_IsError() throws Exception {
177-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
191+
startPublisher();
178192
leakedOffsetStream.onError(Status.PERMISSION_DENIED.asRuntimeException());
179193
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
180194
errorOccurredLatch.await();
@@ -191,7 +205,7 @@ public void publishAfterError_IsError() throws Exception {
191205

192206
@Test
193207
public void multipleBatches_Ok() throws Exception {
194-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
208+
startPublisher();
195209
Message message1 = Message.builder().build();
196210
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
197211
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
@@ -226,7 +240,7 @@ public void multipleBatches_Ok() throws Exception {
226240

227241
@Test
228242
public void retryableError_RecreatesAndRetriesAll() throws Exception {
229-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
243+
startPublisher();
230244
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build();
231245
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build();
232246
Future<Offset> future1 = publisher.publish(message1);
@@ -276,7 +290,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {
276290

277291
@Test
278292
public void invalidOffsetSequence_SetsPermanentException() throws Exception {
279-
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
293+
startPublisher();
280294
Message message1 = Message.builder().build();
281295
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
282296
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();

0 commit comments

Comments
 (0)