@@ -74,9 +74,7 @@ public void testFilterConsumer() throws Exception {
7474 producer .send ("TagC" , msgSize );
7575 Assert .assertEquals ("Not all sent succeeded" , msgSize * 3 , producer .getAllUndupMsgBody ().size ());
7676 consumer .getListener ().waitForMessageConsume (msgSize * 2 , CONSUME_TIME );
77- assertThat (producer .getAllMsgBody ())
78- .containsAllIn (VerifyUtils .getFilterdMessage (producer .getAllMsgBody (),
79- consumer .getListener ().getAllMsgBody ()));
77+ assertThat (producer .getAllMsgBody ()).containsAllIn (VerifyUtils .getFilterdMessage (producer .getAllMsgBody (), consumer .getListener ().getAllMsgBody ()));
8078
8179 assertThat (consumer .getListener ().getAllMsgBody ().size ()).isEqualTo (msgSize * 2 );
8280 }
@@ -90,7 +88,8 @@ public void testFilterPullConsumer() throws Exception {
9088 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer (group );
9189 consumer .setNamesrvAddr (NAMESRV_ADDR );
9290 consumer .start ();
93- Thread .sleep (3000 );
91+ consumer .getDefaultMQPullConsumerImpl ().getRebalanceImpl ().getmQClientFactory ().updateTopicRouteInfoFromNameServer (topic );
92+ consumer .getDefaultMQPullConsumerImpl ().getRebalanceImpl ().getmQClientFactory ().sendHeartbeatToAllBrokerWithLock ();
9493 producer .send ("TagA" , msgSize );
9594 producer .send ("TagB" , msgSize );
9695 producer .send ("TagC" , msgSize );
@@ -102,8 +101,7 @@ public void testFilterPullConsumer() throws Exception {
102101 SINGLE_MQ :
103102 while (true ) {
104103 try {
105- PullResult pullResult =
106- consumer .pull (mq , selector , getMessageQueueOffset (mq ), 32 );
104+ PullResult pullResult = consumer .pull (mq , selector , getMessageQueueOffset (mq ), 32 );
107105 putMessageQueueOffset (mq , pullResult .getNextBeginOffset ());
108106 switch (pullResult .getPullStatus ()) {
109107 case FOUND :
0 commit comments