Skip to content

Commit 952c8ba

Browse files
authored
Close created client in PulsarConsumerTestUtil (#618)
1 parent 9bc6bda commit 952c8ba

File tree

2 files changed

+62
-20
lines changed

2 files changed

+62
-20
lines changed

spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.pulsar.client.api.Schema;
3030
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3131

32+
import org.springframework.core.log.LogAccessor;
33+
import org.springframework.lang.Nullable;
3234
import org.springframework.pulsar.PulsarException;
3335
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
3436
import org.springframework.pulsar.core.PulsarConsumerFactory;
@@ -47,6 +49,10 @@
4749
*/
4850
public final class PulsarConsumerTestUtil<T> implements TopicSpec<T>, SchemaSpec<T>, ConditionsSpec<T> {
4951

52+
private static final LogAccessor LOG = new LogAccessor(PulsarConsumerTestUtil.class);
53+
54+
private final PulsarClient locallyCreatedPulsarClient;
55+
5056
private final PulsarConsumerFactory<T> consumerFactory;
5157

5258
private ConsumedMessagesCondition<T> condition;
@@ -82,7 +88,9 @@ public static <T> TopicSpec<T> consumeMessages() {
8288
public static <T> TopicSpec<T> consumeMessages(String url) {
8389
Assert.notNull(url, "url must not be null");
8490
try {
85-
return PulsarConsumerTestUtil.consumeMessages(PulsarClient.builder().serviceUrl(url).build());
91+
var pulsarClient = PulsarClient.builder().serviceUrl(url).build();
92+
return PulsarConsumerTestUtil.consumeMessagesInternal(pulsarClient,
93+
new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
8694
}
8795
catch (PulsarClientException ex) {
8896
throw new PulsarException(ex);
@@ -97,7 +105,8 @@ public static <T> TopicSpec<T> consumeMessages(String url) {
97105
*/
98106
public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
99107
Assert.notNull(pulsarClient, "pulsarClient must not be null");
100-
return PulsarConsumerTestUtil.consumeMessages(new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
108+
return PulsarConsumerTestUtil.consumeMessagesInternal(null,
109+
new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
101110
}
102111

103112
/**
@@ -107,12 +116,19 @@ public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
107116
* @return the {@link TopicSpec topic step} of the builder
108117
*/
109118
public static <T> TopicSpec<T> consumeMessages(PulsarConsumerFactory<T> pulsarConsumerFactory) {
110-
return new PulsarConsumerTestUtil<>(pulsarConsumerFactory);
119+
return PulsarConsumerTestUtil.consumeMessagesInternal(null, pulsarConsumerFactory);
120+
}
121+
122+
private static <T> TopicSpec<T> consumeMessagesInternal(PulsarClient locallyCreatedPulsarClient,
123+
PulsarConsumerFactory<T> pulsarConsumerFactory) {
124+
return new PulsarConsumerTestUtil<>(locallyCreatedPulsarClient, pulsarConsumerFactory);
111125
}
112126

113-
private PulsarConsumerTestUtil(PulsarConsumerFactory<T> consumerFactory) {
127+
private PulsarConsumerTestUtil(@Nullable PulsarClient locallyCreatedPulsarClient,
128+
PulsarConsumerFactory<T> consumerFactory) {
114129
Assert.notNull(consumerFactory, "PulsarConsumerFactory must not be null");
115130
this.consumerFactory = consumerFactory;
131+
this.locallyCreatedPulsarClient = locallyCreatedPulsarClient;
116132
}
117133

118134
@Override
@@ -173,6 +189,16 @@ public List<Message<T>> get() {
173189
catch (PulsarClientException ex) {
174190
throw new PulsarException(ex);
175191
}
192+
finally {
193+
if (this.locallyCreatedPulsarClient != null && !this.locallyCreatedPulsarClient.isClosed()) {
194+
try {
195+
this.locallyCreatedPulsarClient.close();
196+
}
197+
catch (PulsarClientException e) {
198+
LOG.error(e, () -> "Failed to close locally created Pulsar client due to: " + e.getMessage());
199+
}
200+
}
201+
}
176202
if (this.condition != null && !this.condition.meets(messages)) {
177203
throw new ConditionTimeoutException(
178204
"Condition was not met within %d seconds".formatted(timeout.toSeconds()));

spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtilTests.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pulsar.client.api.PulsarClient;
3030
import org.apache.pulsar.client.api.PulsarClientException;
3131
import org.apache.pulsar.client.api.Schema;
32+
import org.assertj.core.api.InstanceOfAssertFactories;
3233
import org.junit.jupiter.api.AfterEach;
3334
import org.junit.jupiter.api.BeforeEach;
3435
import org.junit.jupiter.api.Test;
@@ -78,13 +79,13 @@ void cleanupFromTest() throws PulsarClientException {
7879
void whenConditionIsSpecifiedThenMessagesConsumedUntilConditionMet() {
7980
var topic = testTopic("cond");
8081
IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
81-
var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
82+
var consumerTestUtil = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
8283
.fromTopic(topic)
8384
.withSchema(Schema.STRING)
8485
.awaitAtMost(Duration.ofSeconds(5))
85-
.until(desiredMessageCount(3))
86-
.get();
87-
assertThat(msgs).hasSize(3);
86+
.until(desiredMessageCount(3));
87+
assertThat(consumerTestUtil.get()).hasSize(3);
88+
assertThatLocallyCreatedClientIsNull(consumerTestUtil);
8889
}
8990

9091
@Test
@@ -132,17 +133,18 @@ void whenConditionNotMetWithinAwaitDurationThenExceptionIsThrown() {
132133
void consumeMessagesWithNoArgsUsesPulsarContainerIfAvailable() {
133134
var topic = testTopic("no-arg");
134135
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
135-
var msgs = PulsarConsumerTestUtil.<String>consumeMessages()
136+
var consumerTestUtil = PulsarConsumerTestUtil.<String>consumeMessages()
136137
.fromTopic(topic)
137138
.withSchema(Schema.STRING)
138139
.awaitAtMost(Duration.ofSeconds(5))
139-
.until(desiredMessageCount(2))
140-
.get();
141-
assertThat(msgs).hasSize(2);
140+
.until(desiredMessageCount(2));
141+
assertThat(consumerTestUtil.get()).hasSize(2);
142+
assertThatLocallyCreatedClientIsClosed(consumerTestUtil);
142143
}
143144

144145
@Test
145146
void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() {
147+
// @formatter::off
146148
try (MockedStatic<PulsarTestContainerSupport> containerSupport = Mockito
147149
.mockStatic(PulsarTestContainerSupport.class)) {
148150
containerSupport.when(PulsarTestContainerSupport::isContainerStarted).thenReturn(false);
@@ -155,32 +157,46 @@ void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() {
155157
.get())
156158
.withStackTraceContaining("Connection refused: localhost");
157159
}
160+
// @formatter:on
158161
}
159162

160163
@Test
161164
void consumeMessagesWithBrokerUrl() {
162165
var topic = testTopic("url-arg");
163166
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
164-
var msgs = PulsarConsumerTestUtil.<String>consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl())
167+
var consumerTestUtil = PulsarConsumerTestUtil
168+
.<String>consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl())
165169
.fromTopic(topic)
166170
.withSchema(Schema.STRING)
167171
.awaitAtMost(Duration.ofSeconds(5))
168-
.until(desiredMessageCount(2))
169-
.get();
170-
assertThat(msgs).hasSize(2);
172+
.until(desiredMessageCount(2));
173+
assertThat(consumerTestUtil.get()).hasSize(2);
174+
assertThatLocallyCreatedClientIsClosed(consumerTestUtil);
171175
}
172176

173177
@Test
174178
void consumeMessagesWithPulsarClient() {
175179
var topic = testTopic("client-arg");
176180
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
177-
var msgs = PulsarConsumerTestUtil.<String>consumeMessages(this.pulsarClient)
181+
var consumerTestUtil = PulsarConsumerTestUtil.<String>consumeMessages(this.pulsarClient)
178182
.fromTopic(topic)
179183
.withSchema(Schema.STRING)
180184
.awaitAtMost(Duration.ofSeconds(5))
181-
.until(desiredMessageCount(2))
182-
.get();
183-
assertThat(msgs).hasSize(2);
185+
.until(desiredMessageCount(2));
186+
assertThat(consumerTestUtil.get()).hasSize(2);
187+
assertThatLocallyCreatedClientIsNull(consumerTestUtil);
188+
}
189+
190+
private void assertThatLocallyCreatedClientIsNull(ConditionsSpec<?> consumerTestUtil) {
191+
assertThat(consumerTestUtil).extracting("locallyCreatedPulsarClient").isNull();
192+
}
193+
194+
private void assertThatLocallyCreatedClientIsClosed(ConditionsSpec<?> consumerTestUtil) {
195+
assertThat(consumerTestUtil).extracting("locallyCreatedPulsarClient")
196+
.isNotNull()
197+
.asInstanceOf(InstanceOfAssertFactories.type(PulsarClient.class))
198+
.extracting(PulsarClient::isClosed)
199+
.isEqualTo(Boolean.TRUE);
184200
}
185201

186202
@Test

0 commit comments

Comments
 (0)