@@ -71,12 +71,17 @@ public class JMSWorker {
7171 private boolean connected = false ; // Whether connected to MQ
7272 private AtomicBoolean closeNow ; // Whether close has been requested
7373 private AbstractConfig config ;
74- private long receiveTimeout ; // Receive timeout for the jms consumer
74+ private long initialReceiveTimeoutMs ; // Receive timeout for the jms consumer
75+ private long subsequentReceiveTimeoutMs ; // Receive timeout for the jms consumer on the subsequent calls
7576 private long reconnectDelayMillisMin ; // Delay between repeated reconnect attempts min
7677 private long reconnectDelayMillisMax ; // Delay between repeated reconnect attempts max
7778
78- long getReceiveTimeout () {
79- return receiveTimeout ;
79+ long getInitialReceiveTimeoutMs () {
80+ return initialReceiveTimeoutMs ;
81+ }
82+
83+ long getSubsequentReceiveTimeoutMs () {
84+ return subsequentReceiveTimeoutMs ;
8085 }
8186
8287 long getReconnectDelayMillisMin () {
@@ -144,7 +149,8 @@ public void configure(final AbstractConfig config) {
144149 userName = config .getString (MQSourceConnector .CONFIG_NAME_MQ_USER_NAME );
145150 password = config .getPassword (MQSourceConnector .CONFIG_NAME_MQ_PASSWORD );
146151 topic = config .getString (MQSourceConnector .CONFIG_NAME_TOPIC );
147- receiveTimeout = config .getLong (MQSourceConnector .CONFIG_MAX_RECEIVE_TIMEOUT );
152+ initialReceiveTimeoutMs = config .getLong (MQSourceConnector .CONFIG_MAX_RECEIVE_TIMEOUT );
153+ subsequentReceiveTimeoutMs = config .getLong (MQSourceConnector .CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT );
148154 reconnectDelayMillisMin = config .getLong (MQSourceConnector .CONFIG_RECONNECT_DELAY_MIN );
149155 reconnectDelayMillisMax = config .getLong (MQSourceConnector .CONFIG_RECONNECT_DELAY_MAX );
150156 } catch (JMSException | JMSRuntimeException jmse ) {
@@ -222,10 +228,14 @@ public void connect() {
222228 *
223229 * @param queueName The name of the queue to get messages from
224230 * @param queueConfig Any particular queue configuration that should be applied
225- * @param wait Whether to wait indefinitely for a message
231+ * @param initialCall Indicates whether this is the initial receive call in the polling cycle.
232+ * Determines which configured timeout to use:
233+ * - If true, uses the initial receive timeout.
234+ * - If false, uses the subsequent receive timeout.
235+ * A timeout value of 0 results in a non-blocking receiveNoWait() call.
226236 * @return The Message retrieved from MQ
227237 */
228- public Message receive (final String queueName , final QueueConfig queueConfig , final boolean wait ) throws JMSRuntimeException , JMSException {
238+ public Message receive (final String queueName , final QueueConfig queueConfig , final boolean initialCall ) throws JMSRuntimeException , JMSException {
229239 log .trace ("[{}] Entry {}.receive" , Thread .currentThread ().getId (), this .getClass ().getName ());
230240
231241 if (!maybeReconnect ()) {
@@ -243,21 +253,25 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi
243253 jmsConsumers .put (queueName , internalConsumer );
244254 }
245255
246- Message message = null ;
247- if (wait ) {
248- log .debug ("Waiting {} ms for message" , receiveTimeout );
249256
250- message = internalConsumer .receive (receiveTimeout );
257+ final long timeoutMs = initialCall
258+ ? initialReceiveTimeoutMs
259+ : subsequentReceiveTimeoutMs ;
260+
261+ Message message = null ;
262+ if (timeoutMs > 0 ) {
263+ // block up to timeoutMs
264+ message = internalConsumer .receive (timeoutMs );
251265
252266 if (message == null ) {
253- log .debug ("No message received" );
267+ log .debug ("No message received within {} ms on queue={}" , timeoutMs , queueName );
254268 }
255269 } else {
270+ // non‐blocking
256271 message = internalConsumer .receiveNoWait ();
257272 }
258273
259274 log .trace ("[{}] Exit {}.receive, retval={}" , Thread .currentThread ().getId (), this .getClass ().getName (), message );
260-
261275 return message ;
262276 }
263277
0 commit comments