Skip to content

[Bug] NamespaceV2 is not working #9341

@lin-mt

Description

@lin-mt

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

Debian 12

Client: Docker Engine - Community
Version: 28.0.4

RocketMQ version

5.3.1 and 5.3.2

JDK Version

1.8

Describe the Bug

namespace can consume messages from other namespace.

Steps to Reproduce

deploy rocketmq with docker compose:

services:
  namesrv:
    image: apache/rocketmq:5.3.1
    container_name: rmqnamesrv
    restart: on-failure
    volumes:
      - ./logs/namesrv:/home/rocketmq/logs
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:5.3.1
    container_name: rmqbroker
    restart: on-failure
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=namesrv:9876
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Duser.home=/home/rocketmq
    volumes:
      - ./broker_master/store:/home/rocketmq/store
      - ./broker_master/broker.conf:/home/rocketmq/rocketmq-5.3.1/conf/broker.conf
      - ./logs/broker:/home/rocketmq/logs
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker -c ../conf/broker.conf

  console:
    image: harbor.hyuxl.cn/apache/rocketmq-dashboard
    container_name: rmqconsole
    restart: on-failure
    ports:
      - 9080:8080
    environment:
      - JAVA_OPTS=-Dserver.port=8080 -Drocketmq.config.namesrvAddr=namesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq
networks:
  rocketmq:
    name: rocketmq
    driver: bridge

broker.conf

brokerClusterName=DefaultCluster
brokerName=Broker-A
brokerId=0
deleteWhen=03
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
brokerIP1=192.168.30.12
namesrvAddr=192.168.30.12:9876
autoCreateTopicEnable=true

Use the code in the example folder:
ProducerWithNamespace.java

public class ProducerWithNamespace {

    public static final String NAMESPACE = "InstanceTest1";
    public static final String PRODUCER_GROUP = "pidTest";
    public static final String DEFAULT_NAMESRVADDR = "192.168.30.12:9876";
    public static final int MESSAGE_COUNT = 100;
    public static final String TOPIC = "NAMESPACE_TOPIC";
    public static final String TAG = "tagTest";

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamespaceV2(NAMESPACE);

        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = new Message(TOPIC, TAG, ("Hello world " + NAMESPACE).getBytes(StandardCharsets.UTF_8));
            try {
                SendResult result = producer.send(message);
                System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

PushConsumerWithNamespace.java

public class PushConsumerWithNamespace {
    public static final String NAMESPACE = "InstanceTest";
    public static final String CONSUMER_GROUP = "cidTest";
    public static final String DEFAULT_NAMESRVADDR = "192.168.30.12:9876";
    public static final String TOPIC = "NAMESPACE_TOPIC";

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        defaultMQPushConsumer.setNamespaceV2(NAMESPACE);
        defaultMQPushConsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        defaultMQPushConsumer.subscribe(TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            msgs.forEach(msg -> System.out.printf("Msg topic is:%s, MsgId is:%s, Namespace is: %s, Message is: %s, reconsumeTimes is:%s%n", msg.getTopic(), msg.getMsgId(), NAMESPACE, new String(msg.getBody(), StandardCharsets.UTF_8), msg.getReconsumeTimes()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        defaultMQPushConsumer.start();
    }
}

What Did You Expect to See?

namespace InstanceTest can not consume message from namespace InstanceTest1

What Did You See Instead?

Image

Image

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions