21
21
import java .util .concurrent .Executor ;
22
22
23
23
import org .springframework .messaging .Message ;
24
- import org .springframework .messaging .MessageChannel ;
25
24
import org .springframework .messaging .MessageDeliveryException ;
26
25
import org .springframework .messaging .MessageHandler ;
27
26
import org .springframework .messaging .MessagingException ;
@@ -85,10 +84,9 @@ public void addInterceptor(ChannelInterceptor interceptor) {
85
84
86
85
87
86
@ Override
88
- public boolean sendInternal (final Message <?> message , long timeout ) {
89
- for (MessageHandler subscriber : getSubscribers ()) {
90
- ExecutorChannelInterceptorChain chain = new ExecutorChannelInterceptorChain ();
91
- SendTask sendTask = new SendTask (message , this , subscriber , chain );
87
+ public boolean sendInternal (Message <?> message , long timeout ) {
88
+ for (MessageHandler handler : getSubscribers ()) {
89
+ SendTask sendTask = new SendTask (message , handler );
92
90
if (this .executor == null ) {
93
91
sendTask .run ();
94
92
}
@@ -101,92 +99,75 @@ public boolean sendInternal(final Message<?> message, long timeout) {
101
99
102
100
103
101
/**
104
- * Helps with the invocation of configured executor channel interceptors .
102
+ * Invoke a MessageHandler with ExecutorChannelInterceptor's .
105
103
*/
106
- private class ExecutorChannelInterceptorChain {
107
-
108
- private int interceptorIndex = -1 ;
109
-
110
- public Message <?> applyBeforeHandle (Message <?> message , MessageChannel channel , MessageHandler handler ) {
111
- for (ExecutorChannelInterceptor interceptor : executorInterceptors ) {
112
- message = interceptor .beforeHandle (message , channel , handler );
113
- if (message == null ) {
114
- String name = interceptor .getClass ().getSimpleName ();
115
- if (logger .isDebugEnabled ()) {
116
- logger .debug (name + " returned null from beforeHandle, i.e. precluding the send." );
117
- }
118
- triggerAfterMessageHandled (message , channel , handler , null );
119
- return null ;
120
- }
121
- this .interceptorIndex ++;
122
- }
123
- return message ;
124
- }
125
-
126
- public void triggerAfterMessageHandled (Message <?> message , MessageChannel channel ,
127
- MessageHandler handler , Exception ex ) {
128
-
129
- for (int i = this .interceptorIndex ; i >= 0 ; i --) {
130
- ExecutorChannelInterceptor interceptor = executorInterceptors .get (i );
131
- try {
132
- interceptor .afterMessageHandled (message , channel , handler , ex );
133
- }
134
- catch (Throwable ex2 ) {
135
- logger .error ("Exception from afterMessageHandled in " + interceptor , ex2 );
136
- }
137
- }
138
- }
139
- }
140
-
141
-
142
- /**
143
- * Helps with the invocation of the target MessageHandler and interceptors.
144
- */
145
- private static class SendTask implements Runnable {
104
+ private class SendTask implements Runnable {
146
105
147
106
private final Message <?> inputMessage ;
148
107
149
- private final MessageChannel channel ;
150
-
151
108
private final MessageHandler handler ;
152
109
153
- private final ExecutorChannelInterceptorChain chain ;
110
+ private int interceptorIndex = - 1 ;
154
111
155
- public SendTask (Message <?> message , MessageChannel channel , MessageHandler handler ,
156
- ExecutorChannelInterceptorChain chain ) {
157
112
113
+ public SendTask (Message <?> message , MessageHandler handler ) {
158
114
this .inputMessage = message ;
159
- this .channel = channel ;
160
115
this .handler = handler ;
161
- this .chain = chain ;
162
116
}
163
117
164
118
@ Override
165
119
public void run () {
166
120
Message <?> message = this .inputMessage ;
167
121
try {
168
- message = chain . applyBeforeHandle (message , this . channel , this . handler );
122
+ message = applyBeforeHandle (message );
169
123
if (message == null ) {
170
124
return ;
171
125
}
172
126
this .handler .handleMessage (message );
173
- this . chain . triggerAfterMessageHandled (message , this . channel , this . handler , null );
127
+ triggerAfterMessageHandled (message , null );
174
128
}
175
129
catch (Exception ex ) {
176
- this . chain . triggerAfterMessageHandled (message , this . channel , this . handler , ex );
130
+ triggerAfterMessageHandled (message , ex );
177
131
if (ex instanceof MessagingException ) {
178
132
throw (MessagingException ) ex ;
179
133
}
180
- throw new MessageDeliveryException ( message ,
181
- "Failed to handle message to " + this . channel + " in " + this . handler , ex );
134
+ String description = "Failed to handle " + message + " to " + this + " in " + this . handler ;
135
+ throw new MessageDeliveryException ( message , description , ex );
182
136
}
183
137
catch (Error ex ) {
184
- this .chain .triggerAfterMessageHandled (message , this .channel , this .handler ,
185
- new MessageDeliveryException (message ,
186
- "Failed to handle message to " + this .channel + " in " + this .handler , ex ));
138
+ String description = "Failed to handle " + message + " to " + this + " in " + this .handler ;
139
+ triggerAfterMessageHandled (message , new MessageDeliveryException (message , description , ex ));
187
140
throw ex ;
188
141
}
189
142
}
143
+
144
+ private Message <?> applyBeforeHandle (Message <?> message ) {
145
+ for (ExecutorChannelInterceptor interceptor : executorInterceptors ) {
146
+ message = interceptor .beforeHandle (message , ExecutorSubscribableChannel .this , this .handler );
147
+ if (message == null ) {
148
+ String name = interceptor .getClass ().getSimpleName ();
149
+ if (logger .isDebugEnabled ()) {
150
+ logger .debug (name + " returned null from beforeHandle, i.e. precluding the send." );
151
+ }
152
+ triggerAfterMessageHandled (message , null );
153
+ return null ;
154
+ }
155
+ this .interceptorIndex ++;
156
+ }
157
+ return message ;
158
+ }
159
+
160
+ private void triggerAfterMessageHandled (Message <?> message , Exception ex ) {
161
+ for (int i = this .interceptorIndex ; i >= 0 ; i --) {
162
+ ExecutorChannelInterceptor interceptor = executorInterceptors .get (i );
163
+ try {
164
+ interceptor .afterMessageHandled (message , ExecutorSubscribableChannel .this , this .handler , ex );
165
+ }
166
+ catch (Throwable ex2 ) {
167
+ logger .error ("Exception from afterMessageHandled in " + interceptor , ex2 );
168
+ }
169
+ }
170
+ }
190
171
}
191
172
192
173
}
0 commit comments