Skip to content

Commit 74f0a6e

Browse files
chore: Fix flaky PubsubLiteSinkTest by adding explicit bundling to messages using TestStream. (#83)
1 parent 209cae5 commit 74f0a6e

File tree

1 file changed

+44
-6
lines changed

1 file changed

+44
-6
lines changed

pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import static com.google.common.truth.Truth.assertThat;
2020
import static com.google.common.truth.Truth8.assertThat;
2121
import static org.junit.Assert.assertThrows;
22+
import static org.junit.Assert.fail;
2223
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.Mockito.doAnswer;
24-
import static org.mockito.Mockito.mock;
2525
import static org.mockito.Mockito.times;
2626
import static org.mockito.Mockito.verify;
2727
import static org.mockito.Mockito.when;
@@ -39,12 +39,15 @@
3939
import com.google.cloud.pubsublite.TopicName;
4040
import com.google.cloud.pubsublite.TopicPaths;
4141
import com.google.cloud.pubsublite.internal.ExtractStatus;
42+
import com.google.cloud.pubsublite.internal.FakeApiService;
4243
import com.google.cloud.pubsublite.internal.Publisher;
4344
import com.google.common.collect.ImmutableList;
4445
import com.google.protobuf.ByteString;
4546
import io.grpc.Status;
4647
import io.grpc.StatusException;
4748
import java.util.Optional;
49+
import java.util.concurrent.CountDownLatch;
50+
import java.util.concurrent.ExecutorService;
4851
import java.util.concurrent.Executors;
4952
import java.util.concurrent.Future;
5053
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
@@ -58,14 +61,18 @@
5861
import org.junit.runners.JUnit4;
5962
import org.mockito.ArgumentCaptor;
6063
import org.mockito.Captor;
64+
import org.mockito.MockitoAnnotations;
65+
import org.mockito.Spy;
6166
import org.mockito.stubbing.Answer;
6267

6368
@RunWith(JUnit4.class)
6469
public class PubsubLiteSinkTest {
6570
@Rule public final TestPipeline pipeline = TestPipeline.create();
6671

67-
@SuppressWarnings("unchecked")
68-
private final Publisher<PublishMetadata> publisher = mock(Publisher.class);
72+
abstract static class PublisherFakeService extends FakeApiService
73+
implements Publisher<PublishMetadata> {}
74+
75+
@Spy private PublisherFakeService publisher;
6976

7077
private final PublisherOptions defaultOptions() {
7178
try {
@@ -99,6 +106,7 @@ private void runWith(Message... messages) {
99106

100107
@Before
101108
public void setUp() throws Exception {
109+
MockitoAnnotations.initMocks(this);
102110
PerServerPublisherCache.cache.set(defaultOptions(), publisher);
103111
doAnswer(
104112
(Answer<Void>)
@@ -151,19 +159,49 @@ public void exceptionMixedWithOK() throws Exception {
151159
Message message1 = Message.builder().build();
152160
Message message2 = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
153161
Message message3 = Message.builder().setKey(ByteString.copyFromUtf8("def")).build();
162+
SettableApiFuture<PublishMetadata> future1 = SettableApiFuture.create();
163+
SettableApiFuture<PublishMetadata> future2 = SettableApiFuture.create();
164+
SettableApiFuture<PublishMetadata> future3 = SettableApiFuture.create();
165+
CountDownLatch startedLatch = new CountDownLatch(3);
154166
when(publisher.publish(message1))
155-
.thenReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(1), Offset.of(2))));
167+
.then(
168+
invocation -> {
169+
startedLatch.countDown();
170+
return future1;
171+
});
156172
when(publisher.publish(message2))
157-
.thenReturn(ApiFutures.immediateFailedFuture(Status.INTERNAL.asException()));
173+
.then(
174+
invocation -> {
175+
startedLatch.countDown();
176+
return future2;
177+
});
158178
when(publisher.publish(message3))
159-
.thenReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(1), Offset.of(3))));
179+
.then(
180+
invocation -> {
181+
startedLatch.countDown();
182+
return future3;
183+
});
184+
ExecutorService exec = Executors.newCachedThreadPool();
185+
exec.execute(
186+
() -> {
187+
try {
188+
startedLatch.await();
189+
future1.set(PublishMetadata.of(Partition.of(1), Offset.of(2)));
190+
future2.setException(Status.INTERNAL.asException());
191+
future3.set(PublishMetadata.of(Partition.of(1), Offset.of(3)));
192+
} catch (StatusException | InterruptedException e) {
193+
fail();
194+
throw new RuntimeException(e);
195+
}
196+
});
160197
PipelineExecutionException e =
161198
assertThrows(PipelineExecutionException.class, () -> runWith(message1, message2, message3));
162199
verify(publisher, times(3)).publish(publishedMessageCaptor.capture());
163200
assertThat(publishedMessageCaptor.getAllValues()).containsExactly(message1, message2, message3);
164201
Optional<Status> statusOr = ExtractStatus.extract(e.getCause());
165202
assertThat(statusOr).isPresent();
166203
assertThat(statusOr.get().getCode()).isEqualTo(Status.Code.INTERNAL);
204+
exec.shutdownNow();
167205
}
168206

169207
@Test

0 commit comments

Comments
 (0)