Subscription: [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. by q977734161 · Pull Request #15322 · apache/iotdb

@q977734161

…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.

code:
Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
        consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
        consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
        consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
        consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
        try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
            pullConsumer.open();
            pullConsumer.subscribe("my_topic");
            while (true) {
                List<SubscriptionMessage> messages = pullConsumer.poll(10000);
                for (final SubscriptionMessage message : messages) {
                    final short messageType = message.getMessageType();
                    if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
                        for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
                            while (dataSet.hasNext()) {
                                final RowRecord record = dataSet.next();
                                System.out.println(record);
                            }
                        }
                    }
                }
            }
        }

Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]

	at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
	at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)

github-actions[bot]

bot reviewed Apr 11, 2025

VGalaxies

@SteveYurongSu SteveYurongSu changed the title [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. Subscription: [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.

Apr 17, 2025

SteveYurongSu

VGalaxies pushed a commit to VGalaxies/iotdb that referenced this pull request

Apr 17, 2025
…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (apache#15322)

### Description
When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.


### Code:

```
Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
        consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
        consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
        consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
        consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
        try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
            pullConsumer.open();
            pullConsumer.subscribe("my_topic");
            while (true) {
                List<SubscriptionMessage> messages = pullConsumer.poll(10000);
                for (final SubscriptionMessage message : messages) {
                    final short messageType = message.getMessageType();
                    if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
                        for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
                            while (dataSet.hasNext()) {
                                final RowRecord record = dataSet.next();
                                System.out.println(record);
                            }
                        }
                    }
                }
            }
        }

```

### Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]

	at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
	at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)


### Reason
Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true.

##### Key changed/added classes 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

SteveYurongSu pushed a commit that referenced this pull request

Apr 21, 2025
…ta, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) (#15358)

* [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322)

### Description
When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.


### Code:

```
Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
        consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
        consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
        consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
        consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
        try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
            pullConsumer.open();
            pullConsumer.subscribe("my_topic");
            while (true) {
                List<SubscriptionMessage> messages = pullConsumer.poll(10000);
                for (final SubscriptionMessage message : messages) {
                    final short messageType = message.getMessageType();
                    if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
                        for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
                            while (dataSet.hasNext()) {
                                final RowRecord record = dataSet.next();
                                System.out.println(record);
                            }
                        }
                    }
                }
            }
        }

```

### Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]

	at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
	at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)


### Reason
Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true.

##### Key changed/added classes 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

* fixup

---------

Co-authored-by: lixiaobao <lixiaobao0202@gmail.com>

JackieTien97 pushed a commit that referenced this pull request

May 14, 2025
…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322)

### Description
When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.

### Code:

```
Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
        consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
        consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
        consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
        consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
        try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
            pullConsumer.open();
            pullConsumer.subscribe("my_topic");
            while (true) {
                List<SubscriptionMessage> messages = pullConsumer.poll(10000);
                for (final SubscriptionMessage message : messages) {
                    final short messageType = message.getMessageType();
                    if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
                        for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
                            while (dataSet.hasNext()) {
                                final RowRecord record = dataSet.next();
                                System.out.println(record);
                            }
                        }
                    }
                }
            }
        }

```

### Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]

	at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
	at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)

### Reason
Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true.

##### Key changed/added classes
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

(cherry picked from commit 858c8b8)