Skip to content

Commit 6d0757a

Browse files
artembilangaryrussell
authored andcommitted
Improve ImapIdleChannelAdapter (#3045)
* Improve ImapIdleChannelAdapter * We should not destroy a `TaskExecutor` in the `stop()`, especially when we are going to restart eventually. Move that logic into `destroy()` * we should not destroy `MailReceiver` in the `stop()`; we don't reinstate it in the `start()`. Move the logic into `destroy()` * Wrap `ReceivingTask` and `IdleTask` into `isRunning()` condition to avoid task executions when we are in stopped state * Remove `ImapMailReceiverTests.testExecShutdown()` since it is not relevant any more and doesn't reflect `mail` module requirements * * Add `ImapMailReceiver.cancelPing()` hook * Also close folder for each `stop()`, as well as in the `destroy()`
1 parent 4db71d7 commit 6d0757a

File tree

4 files changed

+59
-77
lines changed

4 files changed

+59
-77
lines changed

spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ public Object[] receive() throws javax.mail.MessagingException {
390390
}
391391
}
392392

393-
private void closeFolder() {
393+
protected void closeFolder() {
394394
this.folderReadLock.lock();
395395
try {
396396
MailTransportUtils.closeFolder(this.folder, this.shouldDeleteMessages);

spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java

Lines changed: 47 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be
7878

7979
private boolean shouldReconnectAutomatically = true;
8080

81-
private Executor sendingTaskExecutor;
81+
private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1);
8282

8383
private boolean sendingTaskExecutorSet;
8484

@@ -95,14 +95,14 @@ public ImapIdleChannelAdapter(ImapMailReceiver mailReceiver) {
9595

9696
public void setTransactionSynchronizationFactory(
9797
TransactionSynchronizationFactory transactionSynchronizationFactory) {
98+
9899
this.transactionSynchronizationFactory = transactionSynchronizationFactory;
99100
}
100101

101102
public void setAdviceChain(List<Advice> adviceChain) {
102103
this.adviceChain = adviceChain;
103104
}
104105

105-
106106
/**
107107
* Specify an {@link Executor} used to send messages received by the
108108
* adapter.
@@ -156,29 +156,23 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
156156
protected void doStart() {
157157
TaskScheduler scheduler = getTaskScheduler();
158158
Assert.notNull(scheduler, "'taskScheduler' must not be null");
159-
if (this.sendingTaskExecutor == null) {
160-
this.sendingTaskExecutor = Executors.newFixedThreadPool(1);
161-
}
162159
this.receivingTask = scheduler.schedule(new ReceivingTask(), this.receivingTaskTrigger);
163160
}
164161

165162
@Override
166163
// guarded by super#lifecycleLock
167164
protected void doStop() {
168165
this.receivingTask.cancel(true);
169-
try {
170-
this.mailReceiver.destroy();
171-
}
172-
catch (Exception e) {
173-
throw new IllegalStateException(
174-
"Failure during the destruction of Mail receiver: " + this.mailReceiver, e);
175-
}
176-
/*
177-
* If we're running with the default executor, shut it down.
178-
*/
166+
this.mailReceiver.cancelPing();
167+
}
168+
169+
@Override
170+
public void destroy() {
171+
super.destroy();
172+
this.mailReceiver.destroy();
173+
// If we're running with the default executor, shut it down.
179174
if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) {
180175
((ExecutorService) this.sendingTaskExecutor).shutdown();
181-
this.sendingTaskExecutor = null;
182176
}
183177
}
184178

@@ -250,17 +244,19 @@ private class ReceivingTask implements Runnable {
250244

251245
@Override
252246
public void run() {
253-
try {
254-
ImapIdleChannelAdapter.this.idleTask.run();
255-
logger.debug("Task completed successfully. Re-scheduling it again right away.");
256-
}
257-
catch (Exception e) { //run again after a delay
258-
if (logger.isWarnEnabled()) {
259-
logger.warn("Failed to execute IDLE task. Will attempt to resubmit in "
260-
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.", e);
247+
if (isRunning()) {
248+
try {
249+
ImapIdleChannelAdapter.this.idleTask.run();
250+
logger.debug("Task completed successfully. Re-scheduling it again right away.");
251+
}
252+
catch (Exception e) { //run again after a delay
253+
if (logger.isWarnEnabled()) {
254+
logger.warn("Failed to execute IDLE task. Will attempt to resubmit in "
255+
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.", e);
256+
}
257+
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
258+
publishException(e);
261259
}
262-
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
263-
publishException(e);
264260
}
265261
}
266262

@@ -275,38 +271,33 @@ private class IdleTask implements Runnable {
275271

276272
@Override
277273
public void run() {
278-
final TaskScheduler scheduler = getTaskScheduler();
279-
Assert.notNull(scheduler, "'taskScheduler' must not be null");
280-
/*
281-
* The following shouldn't be necessary because doStart() will have ensured we have
282-
* one. But, just in case...
283-
*/
284-
Assert.state(ImapIdleChannelAdapter.this.sendingTaskExecutor != null,
285-
"'sendingTaskExecutor' must not be null");
286-
287-
try {
288-
logger.debug("waiting for mail");
289-
ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages();
290-
Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder();
291-
if (folder != null && folder.isOpen()) {
292-
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
293-
if (logger.isDebugEnabled()) {
294-
logger.debug("received " + mailMessages.length + " mail messages");
295-
}
296-
for (Object mailMessage : mailMessages) {
297-
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
298-
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
274+
if (isRunning()) {
275+
try {
276+
logger.debug("waiting for mail");
277+
ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages();
278+
Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder();
279+
if (folder != null && folder.isOpen() && isRunning()) {
280+
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
281+
if (logger.isDebugEnabled()) {
282+
logger.debug("received " + mailMessages.length + " mail messages");
283+
}
284+
for (Object mailMessage : mailMessages) {
285+
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
286+
if (isRunning()) {
287+
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
288+
}
289+
}
299290
}
300291
}
301-
}
302-
catch (MessagingException e) {
303-
logger.warn("error occurred in idle task", e);
304-
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
305-
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", e);
306-
}
307-
else {
308-
throw new org.springframework.messaging.MessagingException(
309-
"Failure in 'idle' task. Will NOT resubmit.", e);
292+
catch (MessagingException e) {
293+
logger.warn("error occurred in idle task", e);
294+
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
295+
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", e);
296+
}
297+
else {
298+
throw new org.springframework.messaging.MessagingException(
299+
"Failure in 'idle' task. Will NOT resubmit.", e);
300+
}
310301
}
311302
}
312303
}

spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapMailReceiver.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,20 @@ public void destroy() {
156156
if (this.isInternalScheduler) {
157157
((ThreadPoolTaskScheduler) this.scheduler).shutdown();
158158
}
159+
cancelPing();
160+
}
161+
162+
/**
163+
* The hook to be called when we need to cancel the current ping task and close the mail folder.
164+
* In other words: when IMAP idle should be stopped for some reason.
165+
* The next {@link #waitForNewMessages()} call will re-open the folder and start a new ping task.
166+
* @since 5.2
167+
*/
168+
public void cancelPing() {
159169
if (this.pingTask != null) {
160170
this.pingTask.cancel(true);
161171
}
172+
closeFolder();
162173
}
163174

164175
/**

spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.Arrays;
3535
import java.util.Properties;
3636
import java.util.concurrent.CountDownLatch;
37-
import java.util.concurrent.ExecutorService;
3837
import java.util.concurrent.TimeUnit;
3938
import java.util.concurrent.atomic.AtomicInteger;
4039
import java.util.concurrent.atomic.AtomicReference;
@@ -808,25 +807,6 @@ private Folder testAttachmentsGuts(final ImapMailReceiver receiver) throws Messa
808807
return folder;
809808
}
810809

811-
@Test
812-
public void testExecShutdown() {
813-
ImapIdleChannelAdapter adapter = new ImapIdleChannelAdapter(new ImapMailReceiver());
814-
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
815-
taskScheduler.initialize();
816-
adapter.setTaskScheduler(taskScheduler);
817-
adapter.setReconnectDelay(1);
818-
adapter.start();
819-
ExecutorService exec = TestUtils.getPropertyValue(adapter, "sendingTaskExecutor", ExecutorService.class);
820-
adapter.stop();
821-
assertThat(exec.isShutdown()).isTrue();
822-
adapter.start();
823-
exec = TestUtils.getPropertyValue(adapter, "sendingTaskExecutor", ExecutorService.class);
824-
adapter.stop();
825-
assertThat(exec.isShutdown()).isTrue();
826-
827-
taskScheduler.shutdown();
828-
}
829-
830810
@Test
831811
public void testNullMessages() throws Exception {
832812
Message message1 = mock(Message.class);

0 commit comments

Comments
 (0)