|
16 | 16 |
|
17 | 17 | package org.springframework.integration.handler.advice; |
18 | 18 |
|
| 19 | +import java.util.concurrent.CountDownLatch; |
19 | 20 | import java.util.concurrent.ExecutionException; |
20 | 21 | import java.util.concurrent.Future; |
21 | 22 | import java.util.concurrent.TimeUnit; |
@@ -60,6 +61,9 @@ public class LockRequestHandlerAdviceTests { |
60 | 61 | @Autowired |
61 | 62 | QueueChannel discardChannel; |
62 | 63 |
|
| 64 | + @Autowired |
| 65 | + Config config; |
| 66 | + |
63 | 67 | @Test |
64 | 68 | void verifyLockAroundHandler() throws ExecutionException, InterruptedException, TimeoutException { |
65 | 69 | AsyncMessagingTemplate messagingTemplate = new AsyncMessagingTemplate(); |
@@ -89,12 +93,16 @@ void verifyLockAroundHandler() throws ExecutionException, InterruptedException, |
89 | 93 | Future<Object> test4 = |
90 | 94 | messagingTemplate.asyncConvertSendAndReceive(this.inputChannel, "test4", messagePostProcessor); |
91 | 95 |
|
92 | | - assertThat(test3.get(10, TimeUnit.SECONDS)).isEqualTo("longer_process-1"); |
93 | | - |
| 96 | + // It is hard to achieve exclusive access in time, so expect failure first, |
| 97 | + // then unblock count-down-latch barrier to let success pass. |
94 | 98 | assertThat(test4).failsWithin(10, TimeUnit.SECONDS) |
95 | 99 | .withThrowableOfType(ExecutionException.class) |
96 | 100 | .withRootCauseInstanceOf(TimeoutException.class) |
97 | 101 | .withStackTraceContaining("Could not acquire the lock in time: PT1S"); |
| 102 | + |
| 103 | + this.config.longProcessLatch.countDown(); |
| 104 | + |
| 105 | + assertThat(test3.get(10, TimeUnit.SECONDS)).isEqualTo("longer_process-1"); |
98 | 106 | } |
99 | 107 |
|
100 | 108 | @Configuration |
@@ -122,10 +130,19 @@ LockRequestHandlerAdvice lockRequestHandlerAdvice(LockRegistry lockRegistry, Que |
122 | 130 |
|
123 | 131 | AtomicInteger counter = new AtomicInteger(); |
124 | 132 |
|
| 133 | + CountDownLatch longProcessLatch = new CountDownLatch(1); |
| 134 | + |
125 | 135 | @ServiceActivator(inputChannel = "inputChannel", adviceChain = "lockRequestHandlerAdvice") |
126 | 136 | String handleWithDelay(String payload) throws InterruptedException { |
127 | 137 | int currentCount = this.counter.incrementAndGet(); |
128 | | - Thread.sleep("longer_process".equals(payload) ? 5000 : 500); |
| 138 | + if ("longer_process".equals(payload)) { |
| 139 | + // Hard to achieve blocking expectations just with timeouts. |
| 140 | + // So, wait for count-down-latch to be fulfilled. |
| 141 | + longProcessLatch.await(10, TimeUnit.SECONDS); |
| 142 | + } |
| 143 | + else { |
| 144 | + Thread.sleep(500); |
| 145 | + } |
129 | 146 | try { |
130 | 147 | return payload + "-" + currentCount; |
131 | 148 | } |
|
0 commit comments