Skip to content

Commit d99101f

Browse files
committed
Polish "Enhance PulsarConsumerTestUtil consumeMessages"
- Add test for consume w/ no args w/ default url - Add javadocs to PulsarConsumerTestUtil
1 parent 2c7e3b2 commit d99101f

File tree

4 files changed

+106
-55
lines changed

4 files changed

+106
-55
lines changed

spring-pulsar-test/spring-pulsar-test.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ dependencies {
1111
implementation project(':spring-pulsar')
1212
testImplementation 'org.assertj:assertj-core'
1313
testImplementation 'org.junit.jupiter:junit-jupiter'
14+
testImplementation 'org.mockito:mockito-core'
1415
}

spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
*
4343
* @param <T> the type of the message payload
4444
* @author Jonas Geiregat
45+
* @author Kartik Shrivastava
46+
* @author Chris Bono
4547
*/
46-
public class PulsarConsumerTestUtil<T> implements TopicSpec<T>, SchemaSpec<T>, ConditionsSpec<T> {
48+
public final class PulsarConsumerTestUtil<T> implements TopicSpec<T>, SchemaSpec<T>, ConditionsSpec<T> {
4749

4850
private final PulsarConsumerFactory<T> consumerFactory;
4951

@@ -57,13 +59,26 @@ public class PulsarConsumerTestUtil<T> implements TopicSpec<T>, SchemaSpec<T>, C
5759

5860
private boolean untilMethodAlreadyCalled = false;
5961

62+
/**
63+
* Begin a builder which will consume messages from the
64+
* {@link PulsarTestContainerSupport} (if available) or the default Pulsar broker url
65+
* {@code pulsar://localhost:6650}.
66+
* @param <T> the message payload type
67+
* @return the {@link TopicSpec topic step} of the builder
68+
*/
6069
public static <T> TopicSpec<T> consumeMessages() {
6170
if (PulsarTestContainerSupport.isContainerStarted()) {
6271
return PulsarConsumerTestUtil.consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl());
6372
}
6473
return PulsarConsumerTestUtil.consumeMessages("pulsar://localhost:6650");
6574
}
6675

76+
/**
77+
* Begin a builder which will consume messages from the specified Pulsar broker url.
78+
* @param <T> the message payload type
79+
* @param url the Pulsar broker url
80+
* @return the {@link TopicSpec topic step} of the builder
81+
*/
6782
public static <T> TopicSpec<T> consumeMessages(String url) {
6883
Assert.notNull(url, "url must not be null");
6984
try {
@@ -74,16 +89,28 @@ public static <T> TopicSpec<T> consumeMessages(String url) {
7489
}
7590
}
7691

92+
/**
93+
* Begin a builder which will consume messages with a provided Pulsar client.
94+
* @param <T> the message payload type
95+
* @param pulsarClient the client to consume with
96+
* @return the {@link TopicSpec topic step} of the builder
97+
*/
7798
public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
7899
Assert.notNull(pulsarClient, "pulsarClient must not be null");
79100
return PulsarConsumerTestUtil.consumeMessages(new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
80101
}
81102

103+
/**
104+
* Begin a builder which will consume messages with a provided consumer factory.
105+
* @param <T> the message payload type
106+
* @param pulsarConsumerFactory the consumer factory to consume with
107+
* @return the {@link TopicSpec topic step} of the builder
108+
*/
82109
public static <T> TopicSpec<T> consumeMessages(PulsarConsumerFactory<T> pulsarConsumerFactory) {
83110
return new PulsarConsumerTestUtil<>(pulsarConsumerFactory);
84111
}
85112

86-
public PulsarConsumerTestUtil(PulsarConsumerFactory<T> consumerFactory) {
113+
private PulsarConsumerTestUtil(PulsarConsumerFactory<T> consumerFactory) {
87114
Assert.notNull(consumerFactory, "PulsarConsumerFactory must not be null");
88115
this.consumerFactory = consumerFactory;
89116
}

spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarTestContainerSupport.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.pulsar.test.support;
1818

19+
import org.junit.jupiter.api.AfterAll;
1920
import org.junit.jupiter.api.BeforeAll;
2021
import org.testcontainers.containers.PulsarContainer;
2122
import org.testcontainers.junit.jupiter.Testcontainers;
@@ -25,6 +26,7 @@
2526
* Provides a static {@link PulsarContainer} that can be shared across test classes.
2627
*
2728
* @author Chris Bono
29+
* @author Kartik Shrivastava
2830
*/
2931
@Testcontainers(disabledWithoutDocker = true)
3032
public interface PulsarTestContainerSupport {
@@ -40,6 +42,11 @@ static void startContainer() {
4042
PULSAR_CONTAINER.start();
4143
}
4244

45+
@AfterAll
46+
static void stopContainer() {
47+
PULSAR_CONTAINER.stop();
48+
}
49+
4350
static String getPulsarBrokerUrl() {
4451
return PULSAR_CONTAINER.getPulsarBrokerUrl();
4552
}
@@ -52,8 +59,4 @@ static boolean isContainerStarted() {
5259
return PULSAR_CONTAINER.isRunning();
5360
}
5461

55-
static void stopContainer() {
56-
PULSAR_CONTAINER.stop();
57-
}
58-
5962
}

spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtilTests.java

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@
2929
import org.apache.pulsar.client.api.PulsarClient;
3030
import org.apache.pulsar.client.api.PulsarClientException;
3131
import org.apache.pulsar.client.api.Schema;
32+
import org.junit.jupiter.api.AfterEach;
3233
import org.junit.jupiter.api.BeforeEach;
3334
import org.junit.jupiter.api.Test;
35+
import org.mockito.MockedStatic;
36+
import org.mockito.Mockito;
3437

38+
import org.springframework.pulsar.PulsarException;
3539
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
3640
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
3741
import org.springframework.pulsar.core.PulsarConsumerFactory;
@@ -41,6 +45,8 @@
4145
* Tests for {@link PulsarConsumerTestUtil}.
4246
*
4347
* @author Jonas Geiregat
48+
* @author Kartik Shrivastava
49+
* @author Chris Bono
4450
*/
4551
class PulsarConsumerTestUtilTests implements PulsarTestContainerSupport {
4652

@@ -61,9 +67,16 @@ void prepareForTest() throws PulsarClientException {
6167
this.pulsarTemplate = new PulsarTemplate<>(new DefaultPulsarProducerFactory<>(pulsarClient));
6268
}
6369

70+
@AfterEach
71+
void cleanupFromTest() throws PulsarClientException {
72+
if (this.pulsarClient != null) {
73+
this.pulsarClient.close();
74+
}
75+
}
76+
6477
@Test
65-
void whenConditionIsSpecifiedMessagesAreConsumedUntilConditionIsMet() {
66-
var topic = testTopic("a");
78+
void whenConditionIsSpecifiedThenMessagesConsumedUntilConditionMet() {
79+
var topic = testTopic("cond");
6780
IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
6881
var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
6982
.fromTopic(topic)
@@ -75,8 +88,8 @@ void whenConditionIsSpecifiedMessagesAreConsumedUntilConditionIsMet() {
7588
}
7689

7790
@Test
78-
void whenConditionIsNotSpecifiedMessagesAreConsumedUntilAwaitDuration() {
79-
var topic = testTopic("b");
91+
void whenConditionIsNotSpecifiedThenMessagesAreConsumedUntilAwaitDuration() {
92+
var topic = testTopic("no-cond");
8093
IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
8194
var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
8295
.fromTopic(topic)
@@ -89,10 +102,25 @@ void whenConditionIsNotSpecifiedMessagesAreConsumedUntilAwaitDuration() {
89102
}
90103

91104
@Test
92-
void exceptionIsThrownWhenConditionNotMetWithinAwaitDuration() {
105+
void whenChainedConditionsAreSpecifiedThenMessagesConsumedUntilAllConditionsMet() {
106+
var topic = testTopic("chained-cond");
107+
IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
108+
ConsumedMessagesCondition<String> condition1 = ConsumedMessagesConditions.desiredMessageCount(5);
109+
ConsumedMessagesCondition<String> condition2 = ConsumedMessagesConditions.atLeastOneMessageMatches("message-1");
110+
var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
111+
.fromTopic(topic)
112+
.withSchema(Schema.STRING)
113+
.awaitAtMost(Duration.ofSeconds(5))
114+
.until(condition1.and(condition2))
115+
.get();
116+
assertThat(msgs).hasSize(5);
117+
}
118+
119+
@Test
120+
void whenConditionNotMetWithinAwaitDurationThenExceptionIsThrown() {
93121
assertThatExceptionOfType(ConditionTimeoutException.class)
94122
.isThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
95-
.fromTopic(testTopic("c"))
123+
.fromTopic(testTopic("cond-not-met"))
96124
.withSchema(Schema.STRING)
97125
.awaitAtMost(Duration.ofSeconds(5))
98126
.until(ConsumedMessagesConditions.desiredMessageCount(3))
@@ -101,9 +129,8 @@ void exceptionIsThrownWhenConditionNotMetWithinAwaitDuration() {
101129
}
102130

103131
@Test
104-
void messagesAreConsumedWhenContainerIsRunningAndConsumeMessagesIsCalledWithoutArguments() {
105-
// depends upon pulsarClient created in prepareForTest
106-
var topic = testTopic("e1");
132+
void consumeMessagesWithNoArgsUsesPulsarContainerIfAvailable() {
133+
var topic = testTopic("no-arg");
107134
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
108135
var msgs = PulsarConsumerTestUtil.<String>consumeMessages()
109136
.fromTopic(topic)
@@ -115,15 +142,24 @@ void messagesAreConsumedWhenContainerIsRunningAndConsumeMessagesIsCalledWithoutA
115142
}
116143

117144
@Test
118-
void messagesAreConsumedWhenContainerIsStoppedAndConsumeMessagesIsCalledWithoutArguments() {
119-
PulsarTestContainerSupport.stopContainer();
120-
PulsarConsumerTestUtil.<String>consumeMessages();
121-
// TODO: Complete this test
145+
void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() {
146+
try (MockedStatic<PulsarTestContainerSupport> containerSupport = Mockito
147+
.mockStatic(PulsarTestContainerSupport.class)) {
148+
containerSupport.when(PulsarTestContainerSupport::isContainerStarted).thenReturn(false);
149+
var topic = testTopic("no-arg-dft-url");
150+
assertThatExceptionOfType(PulsarException.class)
151+
.isThrownBy(() -> PulsarConsumerTestUtil.<String>consumeMessages()
152+
.fromTopic(topic)
153+
.withSchema(Schema.STRING)
154+
.awaitAtMost(Duration.ofSeconds(2))
155+
.get())
156+
.withStackTraceContaining("Connection refused: localhost");
157+
}
122158
}
123159

124160
@Test
125-
void messagesAreConsumedWhenConsumeMessagesIsCalledWithBrokerUrl() {
126-
var topic = testTopic("e2");
161+
void consumeMessagesWithBrokerUrl() {
162+
var topic = testTopic("url-arg");
127163
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
128164
var msgs = PulsarConsumerTestUtil.<String>consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl())
129165
.fromTopic(topic)
@@ -135,15 +171,8 @@ void messagesAreConsumedWhenConsumeMessagesIsCalledWithBrokerUrl() {
135171
}
136172

137173
@Test
138-
void exceptionIsThrownWhenConsumeMessagesIsCalledWithNullBrokerUrl() {
139-
String url = null;
140-
assertThatIllegalArgumentException().isThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(url))
141-
.withMessage("url must not be null");
142-
}
143-
144-
@Test
145-
void messagesAreConsumedWhenConsumeMessagesIsCalledWithPulsarClient() {
146-
var topic = testTopic("e3");
174+
void consumeMessagesWithPulsarClient() {
175+
var topic = testTopic("client-arg");
147176
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
148177
var msgs = PulsarConsumerTestUtil.<String>consumeMessages(this.pulsarClient)
149178
.fromTopic(topic)
@@ -155,31 +184,8 @@ void messagesAreConsumedWhenConsumeMessagesIsCalledWithPulsarClient() {
155184
}
156185

157186
@Test
158-
void exceptionIsThrownWhenConsumeMessagesIsCalledWithNullPulsarClient() {
159-
PulsarClient localPulsarClient = null;
160-
assertThatIllegalArgumentException().isThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(localPulsarClient))
161-
.withMessage("pulsarClient must not be null");
162-
}
163-
164-
@Test
165-
void whenChainedConditionAreSpecifiedMessagesAreConsumedUntilTheyAreMet() {
166-
var topic = testTopic("d");
167-
IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
168-
ConsumedMessagesCondition<String> condition1 = ConsumedMessagesConditions.desiredMessageCount(5);
169-
ConsumedMessagesCondition<String> condition2 = ConsumedMessagesConditions.atLeastOneMessageMatches("message-1");
170-
var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
171-
.fromTopic(topic)
172-
.withSchema(Schema.STRING)
173-
.awaitAtMost(Duration.ofSeconds(5))
174-
.until(condition1.and(condition2))
175-
.get();
176-
assertThat(msgs).hasSize(5);
177-
}
178-
179-
@Test
180-
void exceptionIsThrownWhenUntilIsCalledMultipleTimes() {
181-
var topic = testTopic("e");
182-
IntStream.range(0, 1).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
187+
void untilCannotBeCalledMultipleTimes() {
188+
var topic = testTopic("until-multi");
183189
assertThatExceptionOfType(IllegalStateException.class)
184190
.isThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
185191
.fromTopic(topic)
@@ -191,6 +197,20 @@ void exceptionIsThrownWhenUntilIsCalledMultipleTimes() {
191197
.withMessage("Multiple calls to 'until' are not allowed. Use 'and' to combine conditions.");
192198
}
193199

200+
@Test
201+
void brokerUrlCannotBeNull() {
202+
String url = null;
203+
assertThatIllegalArgumentException().isThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(url))
204+
.withMessage("url must not be null");
205+
}
206+
207+
@Test
208+
void pulsarClientCannotBeNull() {
209+
PulsarClient localPulsarClient = null;
210+
assertThatIllegalArgumentException().isThrownBy(() -> PulsarConsumerTestUtil.consumeMessages(localPulsarClient))
211+
.withMessage("pulsarClient must not be null");
212+
}
213+
194214
@Test
195215
void consumerFactoryCannotBeNull() {
196216
PulsarConsumerFactory<String> consumerFactory = null;

0 commit comments

Comments
 (0)