@@ -1640,7 +1640,7 @@ public void testSubscribeMultipleTopics() throws Exception {
16401640 assertEquals ("Should have received " + topics .length + " messages" , topics .length , received );
16411641 }
16421642
1643- @ Test (timeout = 60 * 1000 )
1643+ @ Test (timeout = 2 * 60 * 1000 )
16441644 public void testReceiveMessageSentWhileOffline () throws Exception {
16451645 final byte [] payload = new byte [1024 * 32 ];
16461646 for (int i = 0 ; i < payload .length ; i ++) {
@@ -1679,36 +1679,38 @@ public void testReceiveMessageSentWhileOffline() throws Exception {
16791679 // Wait for broker to process disconnect before publishing messages for offline delivery.
16801680 assertTrue ("Subscription should become inactive" ,
16811681 Wait .waitFor (() -> isSubscriptionInactive (topics [0 ], mqttSub .getClientId ().toString ()),
1682- TimeUnit .SECONDS .toMillis (5 ), 100 ));
1683-
1684- try {
1685- for (int j = 0 ; j < numberOfRuns ; j ++) {
1682+ TimeUnit .SECONDS .toMillis (10 ), 100 ));
16861683
1687- for (int i = 0 ; i < messagesPerRun ; ++i ) {
1688- connectionPub .publish (topics [0 ].name ().toString (), payload , QoS .AT_LEAST_ONCE , false );
1689- }
1684+ for (int j = 0 ; j < numberOfRuns ; j ++) {
16901685
1691- connectionSub = mqttSub . blockingConnection ();
1692- connectionSub . connect ( );
1693- connectionSub . subscribe ( topics );
1686+ for ( int i = 0 ; i < messagesPerRun ; ++ i ) {
1687+ connectionPub . publish ( topics [ 0 ]. name (). toString (), payload , QoS . AT_LEAST_ONCE , false );
1688+ }
16941689
1695- for (int i = 0 ; i < messagesPerRun ; ++i ) {
1696- Message message = connectionSub .receive (5 , TimeUnit .SECONDS );
1697- assertNotNull (message );
1698- received ++;
1699- assertTrue (Arrays .equals (payload , message .getPayload ()));
1700- message .ack ();
1701- }
1702- connectionSub .disconnect ();
1690+ connectionSub = mqttSub .blockingConnection ();
1691+ connectionSub .connect ();
1692+ connectionSub .subscribe (topics );
17031693
1704- // Wait for broker to process disconnect before next iteration publishes
1705- assertTrue ("Subscription should become inactive" ,
1706- Wait .waitFor (() -> isSubscriptionInactive (topics [0 ], mqttSub .getClientId ().toString ()),
1707- TimeUnit .SECONDS .toMillis (5 ), 100 ));
1694+ // Wait for broker to fully activate the subscription and start dispatching
1695+ // queued messages. subscribe() returns on SUBACK but broker processes the
1696+ // ConsumerInfo asynchronously, so messages may not be ready for dispatch yet.
1697+ assertTrue ("Subscription should become active in run " + (j + 1 ),
1698+ Wait .waitFor (() -> isSubscriptionActive (topics [0 ], mqttSub .getClientId ().toString ()),
1699+ TimeUnit .SECONDS .toMillis (30 ), 100 ));
1700+
1701+ for (int i = 0 ; i < messagesPerRun ; ++i ) {
1702+ Message message = connectionSub .receive (5 , TimeUnit .SECONDS );
1703+ assertNotNull ("Should have received message " + (i + 1 ) + " of " + messagesPerRun + " in run " + (j + 1 ), message );
1704+ received ++;
1705+ assertTrue (Arrays .equals (payload , message .getPayload ()));
1706+ message .ack ();
17081707 }
1709- } catch (Exception exception ) {
1710- LOG .error ("unexpected exception" , exception );
1711- exception .printStackTrace ();
1708+ connectionSub .disconnect ();
1709+
1710+ // Wait for broker to process disconnect before next iteration publishes
1711+ assertTrue ("Subscription should become inactive" ,
1712+ Wait .waitFor (() -> isSubscriptionInactive (topics [0 ], mqttSub .getClientId ().toString ()),
1713+ TimeUnit .SECONDS .toMillis (10 ), 100 ));
17121714 }
17131715 assertEquals ("Should have received " + (messagesPerRun * (numberOfRuns + 1 )) + " messages" , (messagesPerRun * (numberOfRuns + 1 )), received );
17141716 }
@@ -1727,6 +1729,20 @@ private boolean isSubscriptionInactive(Topic topic, String clientId) throws Exce
17271729 }
17281730 }
17291731
1732+ private boolean isSubscriptionActive (Topic topic , String clientId ) throws Exception {
1733+ if (isVirtualTopicSubscriptionStrategy ()) {
1734+ String queueName = buildVirtualTopicQueueName (topic , clientId );
1735+ try {
1736+ return getProxyToQueue (queueName ).getConsumerCount () > 0 ;
1737+ } catch (Exception ignore ) {
1738+ return false ;
1739+ }
1740+ } else {
1741+ return brokerService .getAdminView ().getDurableTopicSubscribers ().length == 1 &&
1742+ brokerService .getAdminView ().getInactiveDurableTopicSubscribers ().length == 0 ;
1743+ }
1744+ }
1745+
17301746 private boolean isVirtualTopicSubscriptionStrategy () {
17311747 String config = getProtocolConfig ();
17321748 return config != null && config .contains ("mqtt-virtual-topic-subscriptions" );
0 commit comments