Skip to content

Commit 67dafa1

Browse files
[improve][client] Test no exception could be thrown for invalid epoch in message (#25013)
1 parent ec609af commit 67dafa1

File tree

2 files changed

+123
-2
lines changed

2 files changed

+123
-2
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import io.netty.buffer.Unpooled;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.CopyOnWriteArrayList;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.function.BiConsumer;
29+
import lombok.Cleanup;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.pulsar.client.api.ProducerConsumerBase;
32+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
33+
import org.apache.pulsar.client.util.ExecutorProvider;
34+
import org.apache.pulsar.common.api.proto.BaseCommand;
35+
import org.apache.pulsar.common.api.proto.MessageMetadata;
36+
import org.apache.pulsar.common.protocol.Commands;
37+
import org.testng.Assert;
38+
import org.testng.annotations.AfterClass;
39+
import org.testng.annotations.BeforeClass;
40+
import org.testng.annotations.Test;
41+
42+
@Slf4j
43+
@Test(groups = "broker-impl")
44+
public class MockMessageTest extends ProducerConsumerBase {
45+
46+
private final Map<Thread, List<Throwable>> threadFailures = new ConcurrentHashMap<>();
47+
48+
@BeforeClass
49+
@Override
50+
protected void setup() throws Exception {
51+
super.internalSetup();
52+
super.producerBaseSetup();
53+
}
54+
55+
@AfterClass
56+
@Override
57+
protected void cleanup() throws Exception {
58+
super.internalCleanup();
59+
}
60+
61+
@Test
62+
public void testMessageWithWrongEpoch() throws Exception {
63+
threadFailures.clear();
64+
final var conf = new ClientConfigurationData();
65+
conf.setServiceUrl(pulsar.getBrokerServiceUrl());
66+
@Cleanup final var client = PulsarClientImpl.builder().conf(conf)
67+
.internalExecutorProvider(new ExecutorProvider(1, "internal", false,
68+
this::newThreadFactory))
69+
.externalExecutorProvider(new ExecutorProvider(1, "external", false))
70+
.build();
71+
72+
final var topic = "test-message-with-wrong-epoch";
73+
@Cleanup final var consumer = (ConsumerImpl<byte[]>) client.newConsumer()
74+
.topic(topic).subscriptionName("sub").poolMessages(true).subscribe();
75+
76+
final var cnx = consumer.cnx();
77+
consumer.redeliverUnacknowledgedMessages(); // increase the consumer epoch
78+
Assert.assertEquals(consumer.consumerEpoch, 1L);
79+
final BiConsumer<Long, String> sendMessage = (epoch, value) -> {
80+
cnx.ctx().executor().execute(() -> {
81+
final var cmd = new BaseCommand();
82+
cmd.copyFrom(Commands.newMessageCommand(consumer.consumerId, 0L, 0L, 0, 0, null, epoch));
83+
final var metadata = new MessageMetadata().setPublishTime(System.currentTimeMillis())
84+
.setProducerName("producer").setSequenceId(0).clearNumMessagesInBatch();
85+
final var buffer = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, metadata,
86+
Unpooled.wrappedBuffer(value.getBytes()));
87+
cnx.handleMessage(cmd.getMessage(), buffer);
88+
});
89+
};
90+
sendMessage.accept(0L, "msg-0"); // 0 is an old epoch that will be rejected
91+
sendMessage.accept(1L, "msg-1");
92+
93+
final var msg = consumer.receive(3, TimeUnit.SECONDS);
94+
Assert.assertNotNull(msg);
95+
Assert.assertEquals(msg.getValue(), "msg-1".getBytes(StandardCharsets.UTF_8));
96+
Assert.assertTrue(threadFailures.isEmpty());
97+
}
98+
99+
private ExecutorProvider.ExtendedThreadFactory newThreadFactory(String poolName, boolean daemon) {
100+
return new ExecutorProvider.ExtendedThreadFactory(poolName, daemon) {
101+
102+
@Override
103+
public Thread newThread(Runnable r) {
104+
final var thread = super.newThread(r);
105+
thread.setUncaughtExceptionHandler((t, e) -> {
106+
log.error("Unexpected exception in {}", t.getName(), e);
107+
threadFailures.computeIfAbsent(t, __ -> new CopyOnWriteArrayList<>()).add(e);
108+
});
109+
return thread;
110+
}
111+
};
112+
}
113+
}

pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.client.util;
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
22+
import com.google.common.annotations.VisibleForTesting;
2223
import io.netty.util.concurrent.DefaultThreadFactory;
2324
import java.util.ArrayList;
2425
import java.util.List;
@@ -27,6 +28,7 @@
2728
import java.util.concurrent.Executors;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.function.BiFunction;
3032
import lombok.Getter;
3133
import lombok.extern.slf4j.Slf4j;
3234
import org.apache.commons.lang3.tuple.Pair;
@@ -65,13 +67,19 @@ public ExecutorProvider(int numThreads, String poolName) {
6567
}
6668

6769
public ExecutorProvider(int numThreads, String poolName, boolean daemon) {
70+
this(numThreads, poolName, daemon, ExtendedThreadFactory::new);
71+
}
72+
73+
@VisibleForTesting
74+
public ExecutorProvider(
75+
int numThreads, String poolName, boolean daemon,
76+
BiFunction<String/* poolName */, Boolean/* daemon */, ExtendedThreadFactory> threadFactoryCreator) {
6877
checkArgument(numThreads > 0);
6978
this.numThreads = numThreads;
7079
Objects.requireNonNull(poolName);
7180
executors = new ArrayList<>(numThreads);
7281
for (int i = 0; i < numThreads; i++) {
73-
ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
74-
poolName, daemon);
82+
ExtendedThreadFactory threadFactory = threadFactoryCreator.apply(poolName, daemon);
7583
ExecutorService executor = createExecutor(threadFactory);
7684
executors.add(Pair.of(executor, threadFactory));
7785
}

0 commit comments

Comments
 (0)