Skip to content

Commit 5632b4b

Browse files
[improve] change RoutingMode default from UseSinglePartition to RoundRobinDistribution (apache#507)
1 parent e04b2a9 commit 5632b4b

File tree

5 files changed

+24
-11
lines changed

5 files changed

+24
-11
lines changed

include/pulsar/ProducerConfiguration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class PULSAR_PUBLIC ProducerConfiguration {
237237
/**
238238
* Set the message routing modes for partitioned topics.
239239
*
240-
* Default: UseSinglePartition
240+
* Default: RoundRobinDistribution
241241
*
242242
* @param PartitionsRoutingMode partition routing mode.
243243
* @return

lib/ProducerConfigurationImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct ProducerConfigurationImpl {
3434
CompressionType compressionType{CompressionNone};
3535
int maxPendingMessages{1000};
3636
int maxPendingMessagesAcrossPartitions{50000};
37-
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
37+
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::RoundRobinDistribution};
3838
MessageRoutingPolicyPtr messageRouter;
3939
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
4040
bool useLazyStartPartitionedProducers{false};

tests/BasicEndToEndTest.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1697,9 +1697,11 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {
16971697

16981698
std::string subName = "sub-testSeekOnPartitionedTopic";
16991699
Producer producer;
1700+
ProducerConfiguration conf;
1701+
conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
17001702

17011703
Promise<Result, Producer> producerPromise;
1702-
client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
1704+
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
17031705
Future<Result, Producer> producerFuture = producerPromise.getFuture();
17041706
Result result = producerFuture.get(producer);
17051707
ASSERT_EQ(ResultOk, result);

tests/ProducerConfigurationTest.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) {
3333
ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone);
3434
ASSERT_EQ(conf.getMaxPendingMessages(), 1000);
3535
ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000);
36-
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition);
36+
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);
3737
ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{});
3838
ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash);
3939
ASSERT_EQ(conf.getBlockIfQueueFull(), false);
@@ -88,8 +88,8 @@ TEST(ProducerConfigurationTest, testCustomConfig) {
8888
conf.setMaxPendingMessagesAcrossPartitions(100000);
8989
ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000);
9090

91-
conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
92-
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);
91+
conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
92+
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition);
9393

9494
const auto router = std::make_shared<MockMessageRoutingPolicy>();
9595
conf.setMessageRouter(router);

tests/ReaderTest.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ TEST_P(ReaderTest, testSimpleReader) {
6767
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
6868

6969
Producer producer;
70-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
70+
ProducerConfiguration producerConf;
71+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
72+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
7173

7274
for (int i = 0; i < 10; i++) {
7375
std::string content = "my-message-" + std::to_string(i);
@@ -142,7 +144,9 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
142144
initTopic(topicName);
143145

144146
Producer producer;
145-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
147+
ProducerConfiguration producerConf;
148+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
149+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
146150

147151
for (int i = 0; i < 10; i++) {
148152
std::string content = "my-message-" + std::to_string(i);
@@ -176,7 +180,9 @@ TEST_P(ReaderTest, testMultipleReaders) {
176180
initTopic(topicName);
177181

178182
Producer producer;
179-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
183+
ProducerConfiguration producerConf;
184+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
185+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
180186

181187
for (int i = 0; i < 10; i++) {
182188
std::string content = "my-message-" + std::to_string(i);
@@ -223,7 +229,9 @@ TEST_P(ReaderTest, testReaderOnLastMessage) {
223229
initTopic(topicName);
224230

225231
Producer producer;
226-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
232+
ProducerConfiguration producerConf;
233+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
234+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
227235

228236
for (int i = 0; i < 10; i++) {
229237
std::string content = "my-message-" + std::to_string(i);
@@ -263,7 +271,9 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
263271
initTopic(topicName);
264272

265273
Producer producer;
266-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
274+
ProducerConfiguration producerConf;
275+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
276+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
267277

268278
for (int i = 0; i < 10; i++) {
269279
std::string content = "my-message-" + std::to_string(i);
@@ -459,6 +469,7 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
459469
Producer producer;
460470
ProducerConfiguration producerConf;
461471
producerConf.setBatchingEnabled(false);
472+
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
462473
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
463474

464475
// 2. create reader, and expect hasMessageAvailable return false since no message produced.

0 commit comments

Comments
 (0)