Skip to content

Commit 858c8b8

Browse files
authored
[Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (apache#15322)
### Description When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. ### Code: ``` Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667")); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("my_topic"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); if (SubscriptionMessageType.isValidatedMessageType(messageType)) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); System.out.println(record); } } } } } } ``` ### Error: org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)] at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123) at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260) at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112) at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55) ### Reason Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true. ##### Key changed/added classes iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
1 parent b553a6f commit 858c8b8

File tree

1 file changed

+2
-6
lines changed

1 file changed

+2
-6
lines changed

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,8 @@ protected AbstractSubscriptionConsumer(
189189
final AbstractSubscriptionConsumerBuilder builder, final Properties properties) {
190190
this(
191191
builder
192-
.host(
193-
(String)
194-
properties.getOrDefault(ConsumerConstant.HOST_KEY, SessionConfig.DEFAULT_HOST))
195-
.port(
196-
(Integer)
197-
properties.getOrDefault(ConsumerConstant.PORT_KEY, SessionConfig.DEFAULT_PORT))
192+
.host((String) properties.get(ConsumerConstant.HOST_KEY))
193+
.port((Integer) properties.get(ConsumerConstant.PORT_KEY))
198194
.nodeUrls((List<String>) properties.get(ConsumerConstant.NODE_URLS_KEY))
199195
.username(
200196
(String)

0 commit comments

Comments
 (0)