Skip to content

Commit d7dfea7

Browse files
committed
Optimize ZeroMqMessageHandlerTests for ZMQ.Poller
* Fix unused import in the `IntegrationDynamicWebSocketHandlerMapping`
1 parent 93743f6 commit d7dfea7

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

spring-integration-websocket/src/main/java/org/springframework/integration/websocket/config/IntegrationDynamicWebSocketHandlerMapping.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@
2828
import org.springframework.http.server.RequestPath;
2929
import org.springframework.web.HttpRequestHandler;
3030
import org.springframework.web.servlet.HandlerExecutionChain;
31-
import org.springframework.web.servlet.handler.AbstractHandlerMapping;
3231
import org.springframework.web.servlet.handler.AbstractUrlHandlerMapping;
3332
import org.springframework.web.util.ServletRequestPathUtils;
3433
import org.springframework.web.util.pattern.PathPattern;
3534
import org.springframework.web.util.pattern.PathPatternParser;
3635

3736
/**
38-
* The {@link AbstractHandlerMapping} implementation for dynamic WebSocket endpoint registrations in Spring Integration.
37+
* The {@link AbstractUrlHandlerMapping} implementation for dynamic WebSocket endpoint registrations in Spring Integration.
3938
* <p>
4039
* TODO until https://github.com/spring-projects/spring-framework/issues/26798
4140
*

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void testMessageHandlerForPair() {
8686
}
8787

8888
@Test
89-
void testMessageHandlerForPubSub() throws InterruptedException {
89+
void testMessageHandlerForPubSub() {
9090
ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB);
9191
subSocket.setReceiveTimeOut(20_000);
9292
int port = subSocket.bindToRandomPort("tcp://*");
@@ -100,19 +100,27 @@ void testMessageHandlerForPubSub() throws InterruptedException {
100100
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
101101
messageHandler.afterPropertiesSet();
102102

103-
// Give it some time to bind and subscribe
104-
Thread.sleep(2000);
103+
ZMQ.Poller poller = CONTEXT.createPoller(1);
104+
poller.register(subSocket, ZMQ.Poller.POLLIN);
105105

106106
Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();
107107
messageHandler.handleMessage(testMessage).subscribe();
108108

109-
ZMsg msg = ZMsg.recvMsg(subSocket);
110-
assertThat(msg).isNotNull();
111-
assertThat(msg.unwrap().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
112-
Message<?> capturedMessage = new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
113-
assertThat(capturedMessage).isEqualTo(testMessage);
114-
115-
msg.destroy();
109+
while (true) {
110+
poller.poll(10000);
111+
if (poller.pollin(0)) {
112+
ZMsg msg = ZMsg.recvMsg(subSocket);
113+
assertThat(msg).isNotNull();
114+
assertThat(msg.unwrap().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
115+
Message<?> capturedMessage = new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
116+
assertThat(capturedMessage).isEqualTo(testMessage);
117+
msg.destroy();
118+
break;
119+
}
120+
}
121+
122+
poller.unregister(subSocket);
123+
poller.close();
116124
messageHandler.destroy();
117125
subSocket.close();
118126
}

0 commit comments

Comments
 (0)