Skip to content

Commit bdb606b

Browse files
committed
Polishing
1 parent ef1e17f commit bdb606b

File tree

7 files changed

+163
-142
lines changed

7 files changed

+163
-142
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
import java.util.concurrent.TimeUnit;
2121
import java.util.concurrent.TimeoutException;
2222

23+
import reactor.fn.Consumer;
24+
import reactor.rx.Promise;
25+
2326
import org.springframework.util.Assert;
2427
import org.springframework.util.concurrent.FailureCallback;
2528
import org.springframework.util.concurrent.ListenableFuture;
2629
import org.springframework.util.concurrent.ListenableFutureCallback;
2730
import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
2831
import org.springframework.util.concurrent.SuccessCallback;
29-
import reactor.fn.Consumer;
30-
import reactor.rx.Promise;
3132

3233
/**
3334
* Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting
@@ -55,21 +56,20 @@ public void accept(S result) {
5556
try {
5657
registry.success(adapt(result));
5758
}
58-
catch (Throwable t) {
59-
registry.failure(t);
59+
catch (Throwable ex) {
60+
registry.failure(ex);
6061
}
6162
}
6263
});
6364

6465
this.promise.onError(new Consumer<Throwable>() {
6566
@Override
66-
public void accept(Throwable t) {
67-
registry.failure(t);
67+
public void accept(Throwable ex) {
68+
registry.failure(ex);
6869
}
6970
});
7071
}
7172

72-
protected abstract T adapt(S result);
7373

7474
@Override
7575
public T get() throws InterruptedException {
@@ -112,4 +112,7 @@ public void addCallback(SuccessCallback<? super T> successCallback, FailureCallb
112112
this.registry.addFailureCallback(failureCallback);
113113
}
114114

115+
116+
protected abstract T adapt(S result);
117+
115118
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.springframework.messaging.tcp.TcpConnectionHandler;
6161
import org.springframework.messaging.tcp.TcpOperations;
6262
import org.springframework.util.Assert;
63-
import org.springframework.util.ClassUtils;
6463
import org.springframework.util.ReflectionUtils;
6564
import org.springframework.util.concurrent.ListenableFuture;
6665

@@ -83,7 +82,6 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
8382
private static final Method eventLoopGroupMethod = initEventLoopGroupMethod();
8483

8584

86-
8785
private final EventLoopGroup eventLoopGroup;
8886

8987
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
@@ -107,7 +105,6 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
107105
* @param codec the codec to use for encoding and decoding the TCP stream
108106
*/
109107
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
110-
111108
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
112109
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
113110
this.eventLoopGroup = nioEventLoopGroup;
@@ -182,8 +179,8 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHa
182179
return new PassThroughPromiseToListenableFutureAdapter<Void>(
183180
promise.onError(new Consumer<Throwable>() {
184181
@Override
185-
public void accept(Throwable throwable) {
186-
connectionHandler.afterConnectFailure(throwable);
182+
public void accept(Throwable ex) {
183+
connectionHandler.afterConnectFailure(ex);
187184
}
188185
})
189186
);
@@ -262,16 +259,16 @@ private static Method initEventLoopGroupMethod() {
262259
return method;
263260
}
264261
}
265-
throw new IllegalStateException("No compatible Reactor version found.");
262+
throw new IllegalStateException("No compatible Reactor version found");
266263
}
267264

268265

269266
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
270267

271268
@Override
272269
public ReactorConfiguration read() {
273-
return new ReactorConfiguration(Collections.<DispatcherConfiguration>emptyList(),
274-
"sync", new Properties());
270+
return new ReactorConfiguration(
271+
Collections.<DispatcherConfiguration>emptyList(), "sync", new Properties());
275272
}
276273
}
277274

spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.springframework.web.socket.WebSocketHandler;
3939
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
4040

41-
4241
/**
4342
* Adapts {@link WebSocketHandler} to the Jetty 9 WebSocket API.
4443
*
@@ -59,8 +58,8 @@ public class JettyWebSocketHandlerAdapter {
5958

6059

6160
public JettyWebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) {
62-
Assert.notNull(webSocketHandler, "webSocketHandler must not be null");
63-
Assert.notNull(wsSession, "wsSession must not be null");
61+
Assert.notNull(webSocketHandler, "WebSocketHandler must not be null");
62+
Assert.notNull(wsSession, "WebSocketSession must not be null");
6463
this.webSocketHandler = webSocketHandler;
6564
this.wsSession = wsSession;
6665
}
@@ -72,8 +71,8 @@ public void onWebSocketConnect(Session session) {
7271
this.wsSession.initializeNativeSession(session);
7372
this.webSocketHandler.afterConnectionEstablished(this.wsSession);
7473
}
75-
catch (Throwable t) {
76-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
74+
catch (Throwable ex) {
75+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
7776
}
7877
}
7978

@@ -83,8 +82,8 @@ public void onWebSocketText(String payload) {
8382
try {
8483
this.webSocketHandler.handleMessage(this.wsSession, message);
8584
}
86-
catch (Throwable t) {
87-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
85+
catch (Throwable ex) {
86+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
8887
}
8988
}
9089

@@ -94,8 +93,8 @@ public void onWebSocketBinary(byte[] payload, int offset, int length) {
9493
try {
9594
this.webSocketHandler.handleMessage(this.wsSession, message);
9695
}
97-
catch (Throwable t) {
98-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
96+
catch (Throwable ex) {
97+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
9998
}
10099
}
101100

@@ -107,8 +106,8 @@ public void onWebSocketFrame(Frame frame) {
107106
try {
108107
this.webSocketHandler.handleMessage(this.wsSession, message);
109108
}
110-
catch (Throwable t) {
111-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
109+
catch (Throwable ex) {
110+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
112111
}
113112
}
114113
}
@@ -119,8 +118,10 @@ public void onWebSocketClose(int statusCode, String reason) {
119118
try {
120119
this.webSocketHandler.afterConnectionClosed(this.wsSession, closeStatus);
121120
}
122-
catch (Throwable t) {
123-
logger.error("Unhandled error for " + this.wsSession, t);
121+
catch (Throwable ex) {
122+
if (logger.isErrorEnabled()) {
123+
logger.error("Unhandled error for " + this.wsSession, ex);
124+
}
124125
}
125126
}
126127

@@ -129,8 +130,8 @@ public void onWebSocketError(Throwable cause) {
129130
try {
130131
this.webSocketHandler.handleTransportError(this.wsSession, cause);
131132
}
132-
catch (Throwable t) {
133-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
133+
catch (Throwable ex) {
134+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
134135
}
135136
}
136137

spring-websocket/src/main/java/org/springframework/web/socket/adapter/standard/StandardWebSocketHandlerAdapter.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,16 +49,15 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
4949

5050

5151
public StandardWebSocketHandlerAdapter(WebSocketHandler handler, StandardWebSocketSession wsSession) {
52-
Assert.notNull(handler, "handler must not be null");
53-
Assert.notNull(wsSession, "wsSession must not be null");
52+
Assert.notNull(handler, "WebSocketHandler must not be null");
53+
Assert.notNull(wsSession, "WebSocketSession must not be null");
5454
this.handler = handler;
5555
this.wsSession = wsSession;
5656
}
5757

5858

5959
@Override
6060
public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
61-
6261
this.wsSession.initializeNativeSession(session);
6362

6463
if (this.handler.supportsPartialMessages()) {
@@ -100,9 +99,8 @@ public void onMessage(javax.websocket.PongMessage message) {
10099
try {
101100
this.handler.afterConnectionEstablished(this.wsSession);
102101
}
103-
catch (Throwable t) {
104-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
105-
return;
102+
catch (Throwable ex) {
103+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
106104
}
107105
}
108106

@@ -111,8 +109,8 @@ private void handleTextMessage(javax.websocket.Session session, String payload,
111109
try {
112110
this.handler.handleMessage(this.wsSession, textMessage);
113111
}
114-
catch (Throwable t) {
115-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
112+
catch (Throwable ex) {
113+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
116114
}
117115
}
118116

@@ -121,8 +119,8 @@ private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer pay
121119
try {
122120
this.handler.handleMessage(this.wsSession, binaryMessage);
123121
}
124-
catch (Throwable t) {
125-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
122+
catch (Throwable ex) {
123+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
126124
}
127125
}
128126

@@ -131,8 +129,8 @@ private void handlePongMessage(javax.websocket.Session session, ByteBuffer paylo
131129
try {
132130
this.handler.handleMessage(this.wsSession, pongMessage);
133131
}
134-
catch (Throwable t) {
135-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
132+
catch (Throwable ex) {
133+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
136134
}
137135
}
138136

@@ -142,8 +140,10 @@ public void onClose(javax.websocket.Session session, CloseReason reason) {
142140
try {
143141
this.handler.afterConnectionClosed(this.wsSession, closeStatus);
144142
}
145-
catch (Throwable t) {
146-
logger.error("Unhandled error for " + this.wsSession, t);
143+
catch (Throwable ex) {
144+
if (logger.isErrorEnabled()) {
145+
logger.error("Unhandled error for " + this.wsSession, ex);
146+
}
147147
}
148148
}
149149

@@ -152,8 +152,8 @@ public void onError(javax.websocket.Session session, Throwable exception) {
152152
try {
153153
this.handler.handleTransportError(this.wsSession, exception);
154154
}
155-
catch (Throwable t) {
156-
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
155+
catch (Throwable ex) {
156+
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
157157
}
158158
}
159159

spring-websocket/src/main/java/org/springframework/web/socket/handler/ExceptionWebSocketHandlerDecorator.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,16 +25,16 @@
2525
import org.springframework.web.socket.WebSocketSession;
2626

2727
/**
28-
* An exception handling {@link WebSocketHandlerDecorator}. Traps all {@link Throwable}
29-
* instances that escape from the decorated handler and closes the session with
30-
* {@link CloseStatus#SERVER_ERROR}.
28+
* An exception handling {@link WebSocketHandlerDecorator}.
29+
* Traps all {@link Throwable} instances that escape from the decorated
30+
* handler and closes the session with {@link CloseStatus#SERVER_ERROR}.
3131
*
3232
* @author Rossen Stoyanchev
3333
* @since 4.0
3434
*/
3535
public class ExceptionWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
3636

37-
private final Log logger = LogFactory.getLog(ExceptionWebSocketHandlerDecorator.class);
37+
private static final Log logger = LogFactory.getLog(ExceptionWebSocketHandlerDecorator.class);
3838

3939

4040
public ExceptionWebSocketHandlerDecorator(WebSocketHandler delegate) {
@@ -52,20 +52,6 @@ public void afterConnectionEstablished(WebSocketSession session) {
5252
}
5353
}
5454

55-
public static void tryCloseWithError(WebSocketSession session, Throwable exception, Log logger) {
56-
if (logger.isDebugEnabled()) {
57-
logger.debug("Closing due to exception for " + session, exception);
58-
}
59-
if (session.isOpen()) {
60-
try {
61-
session.close(CloseStatus.SERVER_ERROR);
62-
}
63-
catch (Throwable t) {
64-
// ignore
65-
}
66-
}
67-
}
68-
6955
@Override
7056
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
7157
try {
@@ -91,8 +77,25 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus closeSta
9177
try {
9278
getDelegate().afterConnectionClosed(session, closeStatus);
9379
}
94-
catch (Throwable t) {
95-
logger.error("Unhandled error for " + this, t);
80+
catch (Throwable ex) {
81+
if (logger.isErrorEnabled()) {
82+
logger.error("Unhandled error for " + this, ex);
83+
}
84+
}
85+
}
86+
87+
88+
public static void tryCloseWithError(WebSocketSession session, Throwable exception, Log logger) {
89+
if (logger.isDebugEnabled()) {
90+
logger.debug("Closing due to exception for " + session, exception);
91+
}
92+
if (session.isOpen()) {
93+
try {
94+
session.close(CloseStatus.SERVER_ERROR);
95+
}
96+
catch (Throwable ex) {
97+
// ignore
98+
}
9699
}
97100
}
98101

0 commit comments

Comments
 (0)