Skip to content

Commit 246a6db

Browse files
committed
Selector header name is exposed for configuration
Issue: SPR-16732
1 parent 8748ba4 commit 246a6db

File tree

8 files changed

+144
-34
lines changed

8 files changed

+144
-34
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
import org.springframework.util.LinkedMultiValueMap;
4444
import org.springframework.util.MultiValueMap;
4545
import org.springframework.util.PathMatcher;
46+
import org.springframework.util.StringUtils;
4647

4748
/**
4849
* Implementation of {@link SubscriptionRegistry} that stores subscriptions
@@ -113,24 +114,25 @@ public int getCacheLimit() {
113114
}
114115

115116
/**
116-
* Configure the name of a selector header that a subscription message can
117-
* have in order to filter messages based on their headers. The value of the
118-
* header can use Spring EL expressions against message headers.
119-
* <p>For example the following expression expects a header called "foo" to
120-
* have the value "bar":
117+
* Configure the name of a header that a subscription message can have for
118+
* the purpose of filtering messages matched to the subscription. The header
119+
* value is expected to be a Spring EL boolean expression to be applied to
120+
* the headers of messages matched to the subscription.
121+
* <p>For example:
121122
* <pre>
122123
* headers.foo == 'bar'
123124
* </pre>
124-
* <p>By default this is set to "selector".
125+
* <p>By default this is set to "selector". You can set it to a different
126+
* name, or to {@code null} to turn off support for a selector header.
127+
* @param selectorHeaderName the name to use for a selector header
125128
* @since 4.2
126129
*/
127130
public void setSelectorHeaderName(String selectorHeaderName) {
128-
Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
129-
this.selectorHeaderName = selectorHeaderName;
131+
this.selectorHeaderName = StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null;
130132
}
131133

132134
/**
133-
* Return the name for the selector header.
135+
* Return the name for the selector header name.
134136
* @since 4.2
135137
*/
136138
public String getSelectorHeaderName() {
@@ -142,25 +144,31 @@ public String getSelectorHeaderName() {
142144
protected void addSubscriptionInternal(
143145
String sessionId, String subsId, String destination, Message<?> message) {
144146

147+
Expression expression = getSelectorExpression(message.getHeaders());
148+
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
149+
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
150+
}
151+
152+
private Expression getSelectorExpression(MessageHeaders headers) {
145153
Expression expression = null;
146-
MessageHeaders headers = message.getHeaders();
147-
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
148-
if (selector != null) {
149-
try {
150-
expression = this.expressionParser.parseExpression(selector);
151-
this.selectorHeaderInUse = true;
152-
if (logger.isTraceEnabled()) {
153-
logger.trace("Subscription selector: [" + selector + "]");
154+
if (getSelectorHeaderName() != null) {
155+
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
156+
if (selector != null) {
157+
try {
158+
expression = this.expressionParser.parseExpression(selector);
159+
this.selectorHeaderInUse = true;
160+
if (logger.isTraceEnabled()) {
161+
logger.trace("Subscription selector: [" + selector + "]");
162+
}
154163
}
155-
}
156-
catch (Throwable ex) {
157-
if (logger.isDebugEnabled()) {
158-
logger.debug("Failed to parse selector: " + selector, ex);
164+
catch (Throwable ex) {
165+
if (logger.isDebugEnabled()) {
166+
logger.debug("Failed to parse selector: " + selector, ex);
167+
}
159168
}
160169
}
161170
}
162-
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
163-
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
171+
return expression;
164172
}
165173

166174
@Override

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,23 +51,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
5151

5252
private static final byte[] EMPTY_PAYLOAD = new byte[0];
5353

54-
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
55-
56-
private SubscriptionRegistry subscriptionRegistry;
5754

5855
private PathMatcher pathMatcher;
5956

6057
private Integer cacheLimit;
6158

59+
private String selectorHeaderName = "selector";
60+
6261
private TaskScheduler taskScheduler;
6362

6463
private long[] heartbeatValue;
6564

66-
private ScheduledFuture<?> heartbeatFuture;
67-
6865
private MessageHeaderInitializer headerInitializer;
6966

7067

68+
private SubscriptionRegistry subscriptionRegistry;
69+
70+
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
71+
72+
private ScheduledFuture<?> heartbeatFuture;
73+
74+
7175
/**
7276
* Create a SimpleBrokerMessageHandler instance with the given message channels
7377
* and destination prefixes.
@@ -96,12 +100,40 @@ public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
96100
this.subscriptionRegistry = subscriptionRegistry;
97101
initPathMatcherToUse();
98102
initCacheLimitToUse();
103+
initSelectorHeaderNameToUse();
99104
}
100105

101106
public SubscriptionRegistry getSubscriptionRegistry() {
102107
return this.subscriptionRegistry;
103108
}
104109

110+
/**
111+
* Configure the name of a header that a subscription message can have for
112+
* the purpose of filtering messages matched to the subscription. The header
113+
* value is expected to be a Spring EL boolean expression to be applied to
114+
* the headers of messages matched to the subscription.
115+
* <p>For example:
116+
* <pre>
117+
* headers.foo == 'bar'
118+
* </pre>
119+
* <p>By default this is set to "selector". You can set it to a different
120+
* name, or to {@code null} to turn off support for a selector header.
121+
* @param selectorHeaderName the name to use for a selector header
122+
* @since 4.3.17
123+
* @see #setSubscriptionRegistry
124+
* @see DefaultSubscriptionRegistry#setSelectorHeaderName(String)
125+
*/
126+
public void setSelectorHeaderName(String selectorHeaderName) {
127+
this.selectorHeaderName = selectorHeaderName;
128+
initSelectorHeaderNameToUse();
129+
}
130+
131+
private void initSelectorHeaderNameToUse() {
132+
if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
133+
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setSelectorHeaderName(this.selectorHeaderName);
134+
}
135+
}
136+
105137
/**
106138
* When configured, the given PathMatcher is passed down to the underlying
107139
* SubscriptionRegistry to use for matching destination to subscriptions.

spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,8 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
3333

3434
private long[] heartbeat;
3535

36+
private String selectorHeaderName = "selector";
37+
3638

3739
public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) {
3840
super(inChannel, outChannel, prefixes);
@@ -65,6 +67,24 @@ public SimpleBrokerRegistration setHeartbeatValue(long[] heartbeat) {
6567
return this;
6668
}
6769

70+
/**
71+
* Configure the name of a header that a subscription message can have for
72+
* the purpose of filtering messages matched to the subscription. The header
73+
* value is expected to be a Spring EL boolean expression to be applied to
74+
* the headers of messages matched to the subscription.
75+
* <p>For example:
76+
* <pre>
77+
* headers.foo == 'bar'
78+
* </pre>
79+
* <p>By default this is set to "selector". You can set it to a different
80+
* name, or to {@code null} to turn off support for a selector header.
81+
* @param selectorHeaderName the name to use for a selector header
82+
* @since 4.3.17
83+
*/
84+
public void setSelectorHeaderName(String selectorHeaderName) {
85+
this.selectorHeaderName = selectorHeaderName;
86+
}
87+
6888

6989
@Override
7090
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
@@ -76,6 +96,7 @@ protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel broke
7696
if (this.heartbeat != null) {
7797
handler.setHeartbeatValue(this.heartbeat);
7898
}
99+
handler.setSelectorHeaderName(this.selectorHeaderName);
79100
return handler;
80101
}
81102

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,8 @@ public void registerSubscriptionWithSelector() throws Exception {
264264
String destination = "/foo";
265265
String selector = "headers.foo == 'bar'";
266266

267+
// First, try with selector header
268+
267269
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));
268270

269271
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
@@ -276,11 +278,34 @@ public void registerSubscriptionWithSelector() throws Exception {
276278
assertEquals(1, actual.size());
277279
assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId));
278280

281+
// Then without
282+
279283
actual = this.registry.findSubscriptions(createMessage(destination));
280284
assertNotNull(actual);
281285
assertEquals(0, actual.size());
282286
}
283287

288+
@Test
289+
public void registerSubscriptionWithSelectorNotSupported() {
290+
String sessionId = "sess01";
291+
String subscriptionId = "subs01";
292+
String destination = "/foo";
293+
String selector = "headers.foo == 'bar'";
294+
295+
this.registry.setSelectorHeaderName(null);
296+
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));
297+
298+
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
299+
accessor.setDestination(destination);
300+
accessor.setNativeHeader("foo", "bazz");
301+
Message<?> message = MessageBuilder.createMessage("", accessor.getMessageHeaders());
302+
303+
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message);
304+
assertNotNull(actual);
305+
assertEquals(1, actual.size());
306+
assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId));
307+
}
308+
284309
@Test // SPR-11931
285310
public void registerSubscriptionTwiceAndUnregister() {
286311
this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo"));

spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -381,6 +381,10 @@ private RootBeanDefinition registerMessageBroker(Element brokerElement,
381381
String heartbeatValue = simpleBrokerElem.getAttribute("heartbeat");
382382
brokerDef.getPropertyValues().add("heartbeatValue", heartbeatValue);
383383
}
384+
if (simpleBrokerElem.hasAttribute("selector-header")) {
385+
String headerName = simpleBrokerElem.getAttribute("selector-header");
386+
brokerDef.getPropertyValues().add("selectorHeaderName", headerName);
387+
}
384388
}
385389
else if (brokerRelayElem != null) {
386390
String prefix = brokerRelayElem.getAttribute("prefix");

spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.3.xsd

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,22 @@
384384
]]></xsd:documentation>
385385
</xsd:annotation>
386386
</xsd:attribute>
387+
<xsd:attribute name="selector-header" type="xsd:string">
388+
<xsd:annotation>
389+
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.SimpleBrokerMessageHandler"><![CDATA[
390+
Configure the name of a header that a subscription message can have for
391+
the purpose of filtering messages matched to the subscription. The header
392+
value is expected to be a Spring EL boolean expression to be applied to
393+
the headers of messages matched to the subscription.
394+
395+
For example:
396+
headers.foo == 'bar'
397+
398+
By default this is set to "selector". You can set it to a different
399+
name, or to "" to turn off support for a selector header.
400+
]]></xsd:documentation>
401+
</xsd:annotation>
402+
</xsd:attribute>
387403
</xsd:complexType>
388404

389405
<xsd:complexType name="channel">

spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@
4545
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
4646
import org.springframework.messaging.simp.SimpMessagingTemplate;
4747
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
48+
import org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry;
4849
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
4950
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
5051
import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
@@ -199,6 +200,8 @@ public void simpleBroker() throws Exception {
199200
assertNotNull(brokerMessageHandler);
200201
Collection<String> prefixes = brokerMessageHandler.getDestinationPrefixes();
201202
assertEquals(Arrays.asList("/topic", "/queue"), new ArrayList<String>(prefixes));
203+
DefaultSubscriptionRegistry registry = (DefaultSubscriptionRegistry) brokerMessageHandler.getSubscriptionRegistry();
204+
assertEquals("my-selector", registry.getSelectorHeaderName());
202205
assertNotNull(brokerMessageHandler.getTaskScheduler());
203206
assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue());
204207

spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535

3636
<websocket:stomp-error-handler ref="errorHandler" />
3737

38-
<websocket:simple-broker prefix="/topic, /queue" heartbeat="15000,15000" scheduler="scheduler" />
38+
<websocket:simple-broker prefix="/topic, /queue" selector-header="my-selector"
39+
heartbeat="15000,15000" scheduler="scheduler" />
3940

4041
</websocket:message-broker>
4142

0 commit comments

Comments
 (0)