2323import java .util .concurrent .TimeUnit ;
2424import java .util .concurrent .atomic .AtomicInteger ;
2525
26- import org .junit .Test ;
26+ import org .assertj .core .api .InstanceOfAssertFactories ;
27+ import org .junit .jupiter .api .Test ;
2728
2829import org .springframework .beans .factory .BeanFactory ;
2930import org .springframework .core .task .SimpleAsyncTaskExecutor ;
3031import org .springframework .core .task .SyncTaskExecutor ;
3132import org .springframework .integration .dispatcher .RoundRobinLoadBalancingStrategy ;
33+ import org .springframework .integration .util .ErrorHandlingTaskExecutor ;
3234import org .springframework .messaging .Message ;
3335import org .springframework .messaging .MessageChannel ;
3436import org .springframework .messaging .MessageDeliveryException ;
3941import org .springframework .scheduling .concurrent .CustomizableThreadFactory ;
4042
4143import static org .assertj .core .api .Assertions .assertThat ;
42- import static org .assertj .core .api .Assertions .fail ;
44+ import static org .assertj .core .api .Assertions .assertThatIllegalStateException ;
4345import static org .mockito .BDDMockito .willThrow ;
4446import static org .mockito .Mockito .mock ;
4547import static org .mockito .Mockito .verify ;
@@ -59,8 +61,7 @@ public void verifyDifferentThread() throws Exception {
5961 TestHandler handler = new TestHandler (latch );
6062 channel .subscribe (handler );
6163 channel .send (new GenericMessage <>("test" ));
62- latch .await (1000 , TimeUnit .MILLISECONDS );
63- assertThat (latch .getCount ()).isEqualTo (0 );
64+ assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
6465 assertThat (handler .thread ).isNotNull ();
6566 assertThat (Thread .currentThread ().equals (handler .thread )).isFalse ();
6667 assertThat (handler .thread .getName ()).isEqualTo ("test-1" );
@@ -190,7 +191,11 @@ public void interceptorWithModifiedMessage() {
190191
191192 @ Test
192193 public void interceptorWithException () {
193- ExecutorChannel channel = new ExecutorChannel (new SyncTaskExecutor ());
194+ QueueChannel errorChannel = new QueueChannel ();
195+ MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler ();
196+ errorHandler .setDefaultErrorChannel (errorChannel );
197+ ErrorHandlingTaskExecutor executor = new ErrorHandlingTaskExecutor (new SyncTaskExecutor (), errorHandler );
198+ ExecutorChannel channel = new ExecutorChannel (executor );
194199 channel .setBeanFactory (mock (BeanFactory .class ));
195200 channel .afterPropertiesSet ();
196201
@@ -202,12 +207,16 @@ public void interceptorWithException() {
202207 BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor ();
203208 channel .addInterceptor (interceptor );
204209 channel .subscribe (handler );
205- try {
206- channel .send (message );
207- }
208- catch (MessageDeliveryException actual ) {
209- assertThat (actual .getCause ()).isSameAs (expected );
210- }
210+ channel .send (message );
211+
212+ Message <?> receive = errorChannel .receive (10000 );
213+
214+ assertThat (receive ).
215+ extracting (Message ::getPayload )
216+ .asInstanceOf (InstanceOfAssertFactories .throwable (MessageDeliveryException .class ))
217+ .cause ()
218+ .isEqualTo (expected );
219+
211220 verify (handler ).handleMessage (message );
212221 assertThat (interceptor .getCounter ().get ()).isEqualTo (1 );
213222 assertThat (interceptor .wasAfterHandledInvoked ()).isTrue ();
@@ -216,17 +225,14 @@ public void interceptorWithException() {
216225 @ Test
217226 public void testEarlySubscribe () {
218227 ExecutorChannel channel = new ExecutorChannel (mock (Executor .class ));
219- try {
220- channel .subscribe (m -> {
221- });
222- channel .setBeanFactory (mock (BeanFactory .class ));
223- channel .afterPropertiesSet ();
224- fail ("expected Exception" );
225- }
226- catch (IllegalStateException e ) {
227- assertThat (e .getMessage ()).isEqualTo ("You cannot subscribe() until the channel "
228- + "bean is fully initialized by the framework. Do not subscribe in a @Bean definition" );
229- }
228+ channel .subscribe (m -> {
229+ });
230+ channel .setBeanFactory (mock (BeanFactory .class ));
231+
232+ assertThatIllegalStateException ()
233+ .isThrownBy (channel ::afterPropertiesSet )
234+ .withMessage ("You cannot subscribe() until the channel "
235+ + "bean is fully initialized by the framework. Do not subscribe in a @Bean definition" );
230236 }
231237
232238 private static class TestHandler implements MessageHandler {
0 commit comments