Skip to content

Commit 5d399d8

Browse files
lhotarinikhil-ctds
authored andcommitted
[fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMessages (apache#23876)
(cherry picked from commit 52e8730) (cherry picked from commit 720184d)
1 parent 30e25e8 commit 5d399d8

File tree

2 files changed

+413
-1
lines changed

2 files changed

+413
-1
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java

Lines changed: 298 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,42 @@
1818
*/
1919
package org.apache.pulsar.broker;
2020

21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.when;
23+
import com.fasterxml.jackson.core.JsonGenerator;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import com.fasterxml.jackson.databind.ObjectWriter;
26+
import java.io.BufferedReader;
27+
import java.io.IOException;
28+
import java.io.InputStreamReader;
29+
import java.io.StringWriter;
30+
import java.io.UncheckedIOException;
31+
import java.net.HttpURLConnection;
32+
import java.net.URL;
33+
import java.time.Duration;
34+
import java.util.Arrays;
2135
import java.util.UUID;
36+
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.CompletionException;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.TimeoutException;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.concurrent.atomic.AtomicLong;
43+
import java.util.function.BiConsumer;
44+
import java.util.function.BiFunction;
45+
import java.util.stream.Stream;
46+
import lombok.SneakyThrows;
47+
import org.apache.commons.lang3.tuple.Pair;
48+
import org.apache.pulsar.client.admin.PulsarAdmin;
49+
import org.apache.pulsar.client.admin.PulsarAdminException;
50+
import org.apache.pulsar.client.api.Consumer;
51+
import org.apache.pulsar.client.api.Message;
52+
import org.apache.pulsar.client.api.PulsarClientException;
53+
import org.apache.pulsar.common.util.FutureUtil;
54+
import org.apache.pulsar.common.util.ObjectMapperFactory;
2255
import org.mockito.Mockito;
23-
56+
import org.slf4j.Logger;
2457
/**
2558
* Holds util methods used in test.
2659
*/
@@ -77,4 +110,268 @@ public static <T> T spyWithoutRecordingInvocations(T object) {
77110
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
78111
.stubOnly());
79112
}
113+
114+
/**
115+
* Uses Jackson to create a JSON string for the given object
116+
* @param object to convert to JSON
117+
* @return JSON string
118+
*/
119+
public static String toJson(Object object) {
120+
ObjectWriter writer = ObjectMapperFactory.getMapper().writer();
121+
StringWriter stringWriter = new StringWriter();
122+
try (JsonGenerator generator = writer.createGenerator(stringWriter).useDefaultPrettyPrinter()) {
123+
generator.writeObject(object);
124+
} catch (IOException e) {
125+
throw new UncheckedIOException(e);
126+
}
127+
return stringWriter.toString();
128+
}
129+
130+
/**
131+
* Logs the topic stats and internal stats for the given topic
132+
* @param logger logger to use
133+
* @param pulsarAdmin PulsarAdmin client to use
134+
* @param topic topic name
135+
*/
136+
public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic) {
137+
try {
138+
logger.info("[{}] stats: {}", topic, toJson(pulsarAdmin.topics().getStats(topic)));
139+
logger.info("[{}] internalStats: {}", topic,
140+
toJson(pulsarAdmin.topics().getInternalStats(topic, true)));
141+
} catch (PulsarAdminException e) {
142+
logger.warn("Failed to get stats for topic {}", topic, e);
143+
}
144+
}
145+
146+
/**
147+
* Logs the topic stats and internal stats for the given topic
148+
* @param logger logger to use
149+
* @param baseUrl Pulsar service URL
150+
* @param topic topic name
151+
*/
152+
public static void logTopicStats(Logger logger, String baseUrl, String topic) {
153+
logTopicStats(logger, baseUrl, "public", "default", topic);
154+
}
155+
156+
/**
157+
* Logs the topic stats and internal stats for the given topic
158+
* @param logger logger to use
159+
* @param baseUrl Pulsar service URL
160+
* @param tenant tenant name
161+
* @param namespace namespace name
162+
* @param topic topic name
163+
*/
164+
public static void logTopicStats(Logger logger, String baseUrl, String tenant, String namespace, String topic) {
165+
String topicStatsUri =
166+
String.format("%s/admin/v2/persistent/%s/%s/%s/stats", baseUrl, tenant, namespace, topic);
167+
logger.info("[{}] stats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsUri)));
168+
String topicStatsInternalUri =
169+
String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats", baseUrl, tenant, namespace, topic);
170+
logger.info("[{}] internalStats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri)));
171+
}
172+
173+
/**
174+
* Pretty print the given JSON string
175+
* @param jsonString JSON string to pretty print
176+
* @return pretty printed JSON string
177+
*/
178+
public static String jsonPrettyPrint(String jsonString) {
179+
try {
180+
ObjectMapper mapper = new ObjectMapper();
181+
Object json = mapper.readValue(jsonString, Object.class);
182+
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
183+
return writer.writeValueAsString(json);
184+
} catch (IOException e) {
185+
throw new UncheckedIOException(e);
186+
}
187+
}
188+
189+
/**
190+
* Get the resource as a string from the given URI
191+
*/
192+
@SneakyThrows
193+
public static String getJsonResourceAsString(String uri) {
194+
URL url = new URL(uri);
195+
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
196+
connection.setRequestMethod("GET");
197+
connection.setRequestProperty("Accept", "application/json");
198+
try {
199+
int responseCode = connection.getResponseCode();
200+
if (responseCode == 200) {
201+
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
202+
String inputLine;
203+
StringBuilder content = new StringBuilder();
204+
while ((inputLine = in.readLine()) != null) {
205+
content.append(inputLine);
206+
}
207+
return content.toString();
208+
}
209+
} else {
210+
throw new IOException("Failed to get resource: " + uri + ", status: " + responseCode);
211+
}
212+
} finally {
213+
connection.disconnect();
214+
}
215+
}
216+
217+
/**
218+
* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
219+
* The message handler should return true if it wants to continue receiving more messages, false otherwise.
220+
*
221+
* @param messageHandler the message handler
222+
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
223+
* @param consumers the consumers to receive messages from
224+
* @param <T> the message value type
225+
*/
226+
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
227+
Duration quietTimeout,
228+
Consumer<T>... consumers) {
229+
receiveMessages(messageHandler, quietTimeout, Arrays.stream(consumers));
230+
}
231+
232+
/**
233+
* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
234+
* The message handler should return true if it wants to continue receiving more messages, false otherwise.
235+
*
236+
* @param messageHandler the message handler
237+
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
238+
* @param consumers the consumers to receive messages from
239+
* @param <T> the message value type
240+
*/
241+
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
242+
Duration quietTimeout,
243+
Stream<Consumer<T>> consumers) {
244+
long quietTimeoutNanos = quietTimeout.toNanos();
245+
AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime());
246+
FutureUtil.waitForAll(consumers
247+
.map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler,
248+
lastMessageReceivedNanos)).toList()).join();
249+
}
250+
251+
// asynchronously receive messages from a consumer and handle them using the provided message handler
252+
// the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads
253+
// this is useful in tests where multiple consumers are needed to test the functionality
254+
private static <T> CompletableFuture<Void> receiveMessagesAsync(Consumer<T> consumer,
255+
long quietTimeoutNanos,
256+
long receiveTimeoutNanos,
257+
BiFunction<Consumer<T>, Message<T>, Boolean>
258+
messageHandler,
259+
AtomicLong lastMessageReceivedNanos) {
260+
return consumer.receiveAsync()
261+
.orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS)
262+
.handle((msg, t) -> {
263+
long currentNanos = System.nanoTime();
264+
if (t != null) {
265+
if (t instanceof TimeoutException) {
266+
long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get();
267+
if (sinceLastMessageReceivedNanos > quietTimeoutNanos) {
268+
return Pair.of(false, 0L);
269+
} else {
270+
return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos);
271+
}
272+
} else {
273+
throw FutureUtil.wrapToCompletionException(t);
274+
}
275+
}
276+
lastMessageReceivedNanos.set(currentNanos);
277+
return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos);
278+
}).thenComposeAsync(receiveMoreAndNextTimeout -> {
279+
boolean receiveMore = receiveMoreAndNextTimeout.getLeft();
280+
if (receiveMore) {
281+
Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight();
282+
return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos,
283+
messageHandler, lastMessageReceivedNanos);
284+
} else {
285+
return CompletableFuture.completedFuture(null);
286+
}
287+
});
288+
}
289+
290+
/**
291+
* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
292+
* The messages are received until the quiet timeout is reached or the maximum number of messages is received.
293+
*
294+
* @param messageHandler the message handler
295+
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
296+
* @param maxMessages the maximum number of messages to receive
297+
* @param consumers the consumers to receive messages from
298+
* @param <T> the message value type
299+
*/
300+
public static <T> void receiveMessagesN(BiConsumer<Consumer<T>, Message<T>> messageHandler,
301+
Duration quietTimeout,
302+
int maxMessages,
303+
Consumer<T>... consumers)
304+
throws ExecutionException, InterruptedException {
305+
AtomicInteger messagesReceived = new AtomicInteger();
306+
receiveMessages(
307+
(consumer, message) -> {
308+
messageHandler.accept(consumer, message);
309+
return messagesReceived.incrementAndGet() < maxMessages;
310+
}, quietTimeout, consumers);
311+
}
312+
313+
/**
314+
* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
315+
*
316+
* @param messageHandler the message handler
317+
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
318+
* @param consumers the consumers to receive messages from
319+
* @param <T> the message value type
320+
*/
321+
public static <T> void receiveMessagesInThreads(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
322+
final Duration quietTimeout,
323+
Consumer<T>... consumers) {
324+
receiveMessagesInThreads(messageHandler, quietTimeout, Arrays.stream(consumers).sequential());
325+
}
326+
327+
/**
328+
* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
329+
*
330+
* @param messageHandler the message handler
331+
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
332+
* @param consumers the consumers to receive messages from
333+
* @param <T> the message value type
334+
*/
335+
public static <T> void receiveMessagesInThreads(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
336+
final Duration quietTimeout,
337+
Stream<Consumer<T>> consumers) {
338+
FutureUtil.waitForAll(consumers.map(consumer -> {
339+
return CompletableFuture.runAsync(() -> {
340+
try {
341+
while (!Thread.currentThread().isInterrupted()) {
342+
Message<T> msg = consumer.receive((int) quietTimeout.toMillis(), TimeUnit.MILLISECONDS);
343+
if (msg != null) {
344+
if (!messageHandler.apply(consumer, msg)) {
345+
break;
346+
}
347+
} else {
348+
break;
349+
}
350+
}
351+
} catch (PulsarClientException e) {
352+
throw new CompletionException(e);
353+
}
354+
}, runnable -> {
355+
Thread thread = new Thread(runnable, "Consumer-" + consumer.getConsumerName());
356+
thread.start();
357+
});
358+
}).toList()).join();
359+
}
360+
361+
private static long mockConsumerIdGenerator = 0;
362+
363+
public static org.apache.pulsar.broker.service.Consumer createMockConsumer(String consumerName) {
364+
long consumerId = mockConsumerIdGenerator++;
365+
return createMockConsumer(consumerName, consumerName + " consumerId:" + consumerId, consumerId);
366+
}
367+
368+
public static org.apache.pulsar.broker.service.Consumer createMockConsumer(String consumerName, String toString, long consumerId) {
369+
// without stubOnly, the mock will record method invocations and could run into OOME
370+
org.apache.pulsar.broker.service.Consumer
371+
consumer = mock(org.apache.pulsar.broker.service.Consumer.class, Mockito.withSettings().stubOnly());
372+
when(consumer.consumerName()).thenReturn(consumerName);
373+
when(consumer.toString()).thenReturn(consumerName + " consumerId:" + consumerId);
374+
when(consumer.consumerId()).thenReturn(consumerId);
375+
return consumer;
376+
}
80377
}

0 commit comments

Comments
 (0)