Skip to content

Commit 2d9a5f6

Browse files
authored
Introduce a ReceiveMessageAdvice (#3265)
* Introduce a `ReceiveMessageAdvice` * Deprecate an `AbstractMessageSourceAdvice` in favor of `default` method in the `MessageSourceMutator` * Move a `applyReceiveOnlyAdviceChain()` logic into the `AbstractPollingEndpoint`: now both `PollingConsumer` and `SourcePollingChannelAdapter` can use `ReceiveMessageAdvice` * Introduce a `SimpleActiveIdleReceiveMessageAdvice` based already on the `ReceiveMessageAdvice` and deprecate a `SimpleActiveIdleMessageSourceAdvice` which is fully replaceable with newly introduced `SimpleActiveIdleReceiveMessageAdvice` * Add `@SuppressWarnings("deprecation")` for those out-of-the-box `ReceiveMessageAdvice` implementation which still use an `AbstractMessageSourceAdvice` for backward compatibility * Document a new feature and give the `MessageSourceMutator` a new meaning * * Fix language in the `polling-consumer.adoc`
1 parent cfd03f8 commit 2d9a5f6

File tree

16 files changed

+392
-126
lines changed

16 files changed

+392
-126
lines changed
Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,34 +16,20 @@
1616

1717
package org.springframework.integration.aop;
1818

19-
import org.aopalliance.intercept.MethodInterceptor;
20-
import org.aopalliance.intercept.MethodInvocation;
21-
2219
import org.springframework.integration.core.MessageSource;
23-
import org.springframework.messaging.Message;
2420

2521
/**
2622
* Advice for a {@link MessageSource#receive()} method to decide whether a poll
2723
* should be ignored and/or take action after the receive.
2824
*
2925
* @author Gary Russell
26+
* @author Artem Bilan
3027
*
3128
* @since 4.2
29+
*
30+
* @deprecated since 5.3 in favor of {@link MessageSourceMutator}.
3231
*/
33-
public abstract class AbstractMessageSourceAdvice implements MethodInterceptor, MessageSourceMutator {
34-
35-
@Override
36-
public final Object invoke(MethodInvocation invocation) throws Throwable {
37-
Object target = invocation.getThis();
38-
if (!(target instanceof MessageSource)) {
39-
return invocation.proceed();
40-
}
41-
42-
Message<?> result = null;
43-
if (beforeReceive((MessageSource<?>) target)) {
44-
result = (Message<?>) invocation.proceed();
45-
}
46-
return afterReceive(result, (MessageSource<?>) target);
47-
}
32+
@Deprecated
33+
public abstract class AbstractMessageSourceAdvice implements MessageSourceMutator {
4834

4935
}

spring-integration-core/src/main/java/org/springframework/integration/aop/CompoundTriggerAdvice.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.springframework.integration.core.MessageSource;
2020
import org.springframework.integration.util.CompoundTrigger;
21+
import org.springframework.lang.Nullable;
2122
import org.springframework.messaging.Message;
2223
import org.springframework.scheduling.Trigger;
2324
import org.springframework.util.Assert;
@@ -35,7 +36,10 @@
3536
* @since 4.3
3637
*
3738
*/
38-
public class CompoundTriggerAdvice extends AbstractMessageSourceAdvice {
39+
@SuppressWarnings("deprecation")
40+
public class CompoundTriggerAdvice
41+
extends AbstractMessageSourceAdvice
42+
implements ReceiveMessageAdvice {
3943

4044
private final CompoundTrigger compoundTrigger;
4145

@@ -47,8 +51,21 @@ public CompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTr
4751
this.override = overrideTrigger;
4852
}
4953

54+
/**
55+
* @param result the received message.
56+
* @param source the message source.
57+
* @return the message or null
58+
* @deprecated since 5.3 in favor of {@link #afterReceive(Message, Object)}
59+
*/
5060
@Override
61+
@Deprecated
5162
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
63+
return afterReceive(result, (Object) source);
64+
}
65+
66+
@Override
67+
@Nullable
68+
public Message<?> afterReceive(@Nullable Message<?> result, Object source) {
5269
if (result == null) {
5370
this.compoundTrigger.setOverride(this.override);
5471
}

spring-integration-core/src/main/java/org/springframework/integration/aop/MessageSourceMutator.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,19 +17,31 @@
1717
package org.springframework.integration.aop;
1818

1919
import org.springframework.integration.core.MessageSource;
20+
import org.springframework.lang.Nullable;
2021
import org.springframework.messaging.Message;
2122

2223
/**
23-
* An object that can mutate a {@link MessageSource} before and/or after
24+
* A {@link ReceiveMessageAdvice} extension that can mutate a {@link MessageSource} before and/or after
2425
* {@link MessageSource#receive()} is called.
2526
*
2627
* @author Gary Russell
28+
* @author Artem Bilan
2729
*
28-
* @since 5.0.7.
29-
*
30+
* @since 5.0.7
3031
*/
3132
@FunctionalInterface
32-
public interface MessageSourceMutator {
33+
public interface MessageSourceMutator extends ReceiveMessageAdvice {
34+
35+
@Override
36+
default boolean beforeReceive(Object source) {
37+
if (source instanceof MessageSource<?>) {
38+
return beforeReceive((MessageSource<?>) source);
39+
}
40+
else {
41+
throw new IllegalArgumentException(
42+
"The 'MessageSourceMutator' supports only a 'MessageSource' in the before/after hooks: " + source);
43+
}
44+
}
3345

3446
/**
3547
* Subclasses can decide whether to proceed with this poll.
@@ -40,13 +52,26 @@ default boolean beforeReceive(MessageSource<?> source) {
4052
return true;
4153
}
4254

55+
@Override
56+
@Nullable
57+
default Message<?> afterReceive(@Nullable Message<?> result, Object source) {
58+
if (source instanceof MessageSource<?>) {
59+
return afterReceive(result, (MessageSource<?>) source);
60+
}
61+
else {
62+
throw new IllegalArgumentException(
63+
"The 'MessageSourceMutator' supports only a 'MessageSource' in the before/after hooks: " + source);
64+
}
65+
}
66+
4367
/**
4468
* Subclasses can take actions based on the result of the poll; e.g.
4569
* adjust the {@code trigger}. The message can also be replaced with a new one.
4670
* @param result the received message.
4771
* @param source the message source.
4872
* @return a message to continue to process the result, null to discard whatever the poll returned.
4973
*/
50-
Message<?> afterReceive(Message<?> result, MessageSource<?> source);
74+
@Nullable
75+
Message<?> afterReceive(@Nullable Message<?> result, MessageSource<?> source);
5176

5277
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aop;
18+
19+
import org.aopalliance.intercept.MethodInterceptor;
20+
import org.aopalliance.intercept.MethodInvocation;
21+
22+
import org.springframework.integration.core.MessageSource;
23+
import org.springframework.lang.Nullable;
24+
import org.springframework.messaging.Message;
25+
import org.springframework.messaging.PollableChannel;
26+
27+
/**
28+
* An AOP advice to perform hooks before and/or after a {@code receive()} contract is called.
29+
*
30+
* @author Artem Bilan
31+
*
32+
* @since 5.3
33+
*/
34+
@FunctionalInterface
35+
public interface ReceiveMessageAdvice extends MethodInterceptor {
36+
37+
/**
38+
* Subclasses can decide whether to {@link MethodInvocation#proceed()} or not.
39+
* @param source the source of the message to receive.
40+
* @return true to proceed (default).
41+
*/
42+
default boolean beforeReceive(Object source) {
43+
return true;
44+
}
45+
46+
@Override
47+
@Nullable
48+
default Object invoke(MethodInvocation invocation) throws Throwable {
49+
Object target = invocation.getThis();
50+
if (!(target instanceof MessageSource) && !(target instanceof PollableChannel)) {
51+
return invocation.proceed();
52+
}
53+
54+
Message<?> result = null;
55+
if (beforeReceive(target)) {
56+
result = (Message<?>) invocation.proceed();
57+
}
58+
return afterReceive(result, target);
59+
}
60+
61+
/**
62+
* Subclasses can take actions based on the result of the {@link MethodInvocation#proceed()}; e.g.
63+
* adjust the {@code trigger}. The message can also be replaced with a new one.
64+
* @param result the received message.
65+
* @param source the source of the message to receive.
66+
* @return a message to continue to process the result, null to discard whatever
67+
* the {@link MethodInvocation#proceed()} returned.
68+
*/
69+
@Nullable
70+
Message<?> afterReceive(@Nullable Message<?> result, Object source);
71+
72+
}

spring-integration-core/src/main/java/org/springframework/integration/aop/SimpleActiveIdleMessageSourceAdvice.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,11 @@
3131
* @since 4.2
3232
*
3333
* @see DynamicPeriodicTrigger
34+
*
35+
* @deprecated since 5.3 in favor of {@link SimpleActiveIdleReceiveMessageAdvice} with the same
36+
* (but more common) functionality.
3437
*/
38+
@Deprecated
3539
public class SimpleActiveIdleMessageSourceAdvice extends AbstractMessageSourceAdvice {
3640

3741
private final DynamicPeriodicTrigger trigger;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aop;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.integration.util.DynamicPeriodicTrigger;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
A simple advice that polls at one rate when messages exist and another when
27+
* there are no messages.
28+
*
29+
* @author Gary Russell
30+
* @author Artem Bilan
31+
*
32+
* @since 5.3
33+
*
34+
* @see DynamicPeriodicTrigger
35+
*/
36+
public class SimpleActiveIdleReceiveMessageAdvice implements ReceiveMessageAdvice {
37+
38+
private final DynamicPeriodicTrigger trigger;
39+
40+
private volatile Duration idlePollPeriod;
41+
42+
private volatile Duration activePollPeriod;
43+
44+
public SimpleActiveIdleReceiveMessageAdvice(DynamicPeriodicTrigger trigger) {
45+
Assert.notNull(trigger, "'trigger' must not be null");
46+
this.trigger = trigger;
47+
this.idlePollPeriod = trigger.getDuration();
48+
this.activePollPeriod = trigger.getDuration();
49+
}
50+
51+
/**
52+
* Set the poll period when messages are not returned. Defaults to the
53+
* trigger's period.
54+
* @param idlePollPeriod the period in milliseconds.
55+
*/
56+
public void setIdlePollPeriod(long idlePollPeriod) {
57+
this.idlePollPeriod = Duration.ofMillis(idlePollPeriod);
58+
}
59+
60+
/**
61+
* Set the poll period when messages are returned. Defaults to the
62+
* trigger's period.
63+
* @param activePollPeriod the period in milliseconds.
64+
*/
65+
public void setActivePollPeriod(long activePollPeriod) {
66+
this.activePollPeriod = Duration.ofMillis(activePollPeriod);
67+
}
68+
69+
@Override
70+
public Message<?> afterReceive(Message<?> result, Object source) {
71+
if (result == null) {
72+
this.trigger.setDuration(this.idlePollPeriod);
73+
}
74+
else {
75+
this.trigger.setDuration(this.activePollPeriod);
76+
}
77+
return result;
78+
}
79+
80+
}

0 commit comments

Comments
 (0)