1616 */
1717package org .apache .activemq .perf ;
1818
19+ import java .util .concurrent .TimeUnit ;
20+
1921import jakarta .jms .Connection ;
2022import jakarta .jms .JMSException ;
2123import jakarta .jms .MapMessage ;
3941public class InactiveDurableTopicTest extends TestCase {
4042 private static final transient Logger LOG = LoggerFactory .getLogger (InactiveDurableTopicTest .class );
4143
42- private static final int MESSAGE_COUNT = 2000 ;
44+ /**
45+ * Keep the payload small so that the test completes quickly but still
46+ * exercises durable subscription behaviour.
47+ */
48+ private static final int MESSAGE_COUNT = 500 ;
4349 private static final String DEFAULT_PASSWORD = "" ;
4450 private static final String USERNAME = "testuser" ;
4551 private static final String CLIENTID = "mytestclient" ;
@@ -55,21 +61,28 @@ public class InactiveDurableTopicTest extends TestCase {
5561 private ActiveMQConnectionFactory connectionFactory ;
5662 private BrokerService broker ;
5763
64+ private static final int SEND_TIMEOUT_MILLIS = (int ) TimeUnit .SECONDS .toMillis (30 );
65+ private static final long SEND_LOOP_TIMEOUT_MILLIS = TimeUnit .MINUTES .toMillis (2 );
66+ private static final long RECEIVE_TIMEOUT_MILLIS = TimeUnit .SECONDS .toMillis (5 );
67+ private static final String BROKER_NAME = "inactiveDurableTopicTest" ;
68+
5869 @ Override
5970 protected void setUp () throws Exception {
6071 super .setUp ();
6172 broker = new BrokerService ();
62-
63- // broker.setPersistenceAdapter(new KahaPersistenceAdapter() );
64- broker .addConnector (ActiveMQConnectionFactory .DEFAULT_BROKER_BIND_URL );
73+ broker . setUseJmx ( false );
74+ broker .setBrokerName ( BROKER_NAME );
75+ // broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
6576 broker .start ();
66- connectionFactory = new ActiveMQConnectionFactory (ActiveMQConnectionFactory .DEFAULT_BROKER_URL );
77+ // connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
78+ connectionFactory = new ActiveMQConnectionFactory ("vm://" + BROKER_NAME );
6779 /*
6880 * Doesn't matter if you enable or disable these, so just leaving them
6981 * out for this test case connectionFactory.setAlwaysSessionAsync(true);
7082 * connectionFactory.setAsyncDispatch(true);
7183 */
7284 connectionFactory .setUseAsyncSend (true );
85+ connectionFactory .setSendTimeout (SEND_TIMEOUT_MILLIS );
7386 }
7487
7588 @ Override
@@ -124,9 +137,13 @@ public void test2ProducerTestCase() {
124137 assertNotNull (msg );
125138 msg .setString ("key1" , "value1" );
126139 int loop ;
140+ long start = System .currentTimeMillis ();
127141 for (loop = 0 ; loop < MESSAGE_COUNT ; loop ++) {
128142 msg .setInt ("key2" , loop );
129143 publisher .send (msg , DELIVERY_MODE , DELIVERY_PRIORITY , Message .DEFAULT_TIME_TO_LIVE );
144+ if (System .currentTimeMillis () - start > SEND_LOOP_TIMEOUT_MILLIS ) {
145+ throw new AssertionFailedError ("Timed out sending messages at loop: " + loop );
146+ }
130147 if (loop % 5000 == 0 ) {
131148 LOG .info ("Sent " + loop + " messages" );
132149 }
@@ -163,7 +180,10 @@ public void test3CreateSubscription() throws Exception {
163180 assertNotNull (subscriber );
164181 int loop ;
165182 for (loop = 0 ; loop < MESSAGE_COUNT ; loop ++) {
166- subscriber .receive ();
183+ Message message = subscriber .receive (RECEIVE_TIMEOUT_MILLIS );
184+ if (message == null ) {
185+ throw new AssertionFailedError ("Timed out waiting for message " + loop );
186+ }
167187 if (loop % 500 == 0 ) {
168188 LOG .debug ("Received " + loop + " messages" );
169189 }
0 commit comments