Skip to content

Commit 17fc4ea

Browse files
committed
Fix IntReactiveUtils for the proper emission
* Enable `MessageChannelReactiveUtilsTests.testOverproducingWithSubscribableChannel()` back * Disable `StompServerIntegrationTests` again since build on CI is stalled again
1 parent d14c5d5 commit 17fc4ea

File tree

3 files changed

+8
-4
lines changed

3 files changed

+8
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.util;
1818

1919
import java.time.Duration;
20+
import java.util.concurrent.locks.LockSupport;
2021

2122
import org.reactivestreams.Publisher;
2223

@@ -127,7 +128,11 @@ private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(Subscrib
127128
return Flux.defer(() -> {
128129
Sinks.Many<Message<T>> sink = Sinks.many().multicast().onBackpressureBuffer(1);
129130
@SuppressWarnings("unchecked")
130-
MessageHandler messageHandler = (message) -> sink.emitNext((Message<T>) message);
131+
MessageHandler messageHandler = (message) -> {
132+
while (!sink.emitNext((Message<T>) message).hasEmitted()) {
133+
LockSupport.parkNanos(10);
134+
}
135+
};
131136
inputChannel.subscribe(messageHandler);
132137
return sink.asFlux()
133138
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));

spring-integration-core/src/test/java/org/springframework/integration/channel/MessageChannelReactiveUtilsTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.time.Duration;
2222
import java.util.concurrent.atomic.AtomicInteger;
2323

24-
import org.junit.jupiter.api.Disabled;
2524
import org.junit.jupiter.api.Test;
2625

2726
import org.springframework.integration.util.IntegrationReactiveUtils;
@@ -71,7 +70,6 @@ void testBackpressureWithSubscribableChannel() {
7170
}
7271

7372
@Test
74-
@Disabled("Backpressure is not honored")
7573
void testOverproducingWithSubscribableChannel() {
7674
DirectChannel channel = new DirectChannel();
7775

spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.activemq.broker.BrokerService;
2323
import org.junit.jupiter.api.AfterAll;
2424
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.Disabled;
2526
import org.junit.jupiter.api.Test;
2627

2728
import org.springframework.context.ApplicationEvent;
@@ -62,7 +63,7 @@
6263
*
6364
* @since 4.2
6465
*/
65-
//@Disabled("Until the fix in reactor-netty-core")
66+
@Disabled("Until the fix in reactor-netty-core")
6667
public class StompServerIntegrationTests {
6768

6869
private static BrokerService activeMQBroker;

0 commit comments

Comments
 (0)