Skip to content

Commit 9f86108

Browse files
authored
GH-10090: Add Java DSL for AMQP 1.0 adapters (#10468)
* GH-10090: Add Java DSL for AMQP 1.0 adapters Fixes: #10090 * Rework `amqp-1.0.adoc` code snippets to tabs to demonstrate AMQP 1.0 channel adapters with different supported DSLs * And Javadocs to AMQP 1.0 Java DSL classes * Remove from `AmqpClientInboundGateway` wrong delegate for the `taskScheduler` since gateway does not do batches * Remove `taskScheduler` property from the `AmqpClientInboundGatewaySpec` * Correct `AmqpClientMessageHandler` Javadoc about `SmartMessageConverter` * Fix typos in Javadocs
1 parent 5d59630 commit 9f86108

File tree

8 files changed

+1148
-14
lines changed

8 files changed

+1148
-14
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.amqp.core.AsyncAmqpTemplate;
20+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
21+
22+
/**
23+
* Factory class for AMQP 1.0 components.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 7.0
28+
*/
29+
public final class AmqpClient {
30+
31+
/**
32+
* Create an initial {@link AmqpClientInboundGatewaySpec}.
33+
* @param connectionFactory the connectionFactory.
34+
* @param queueNames the queueNames.
35+
* @return the AmqpClientInboundGatewaySpec.
36+
*/
37+
public static AmqpClientInboundGatewaySpec inboundGateway(AmqpConnectionFactory connectionFactory,
38+
String... queueNames) {
39+
40+
return new AmqpClientInboundGatewaySpec(connectionFactory, queueNames);
41+
}
42+
43+
/**
44+
* Create an initial AmqpClientInboundChannelAdapterSpec.
45+
* @param connectionFactory the connectionFactory.
46+
* @param queueNames the queues to consume from.
47+
* @return the AmqpClientInboundChannelAdapterSpec.
48+
*/
49+
public static AmqpClientInboundChannelAdapterSpec inboundChannelAdapter(AmqpConnectionFactory connectionFactory,
50+
String... queueNames) {
51+
52+
return new AmqpClientInboundChannelAdapterSpec(connectionFactory, queueNames);
53+
}
54+
55+
/**
56+
* Create an initial AmqpClientMessageHandlerSpec in an outbound channel adapter mode.
57+
* @param amqpTemplate the amqpTemplate.
58+
* @return the AmqpClientMessageHandlerSpec.
59+
*/
60+
public static AmqpClientMessageHandlerSpec outboundAdapter(AsyncAmqpTemplate amqpTemplate) {
61+
return new AmqpClientMessageHandlerSpec(amqpTemplate);
62+
}
63+
64+
/**
65+
* Create an initial AmqpClientMessageHandlerSpec in a gateway mode.
66+
* @param amqpTemplate the amqpTemplate.
67+
* @return the AmqpClientMessageHandlerSpec.
68+
*/
69+
public static AmqpClientMessageHandlerSpec outboundGateway(AsyncAmqpTemplate amqpTemplate) {
70+
AmqpClientMessageHandlerSpec amqpClientMessageHandlerSpec = new AmqpClientMessageHandlerSpec(amqpTemplate);
71+
amqpClientMessageHandlerSpec.getObject().setRequiresReply(true);
72+
return amqpClientMessageHandlerSpec;
73+
}
74+
75+
private AmqpClient() {
76+
}
77+
78+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import java.time.Duration;
20+
21+
import com.rabbitmq.client.amqp.Resource;
22+
import org.aopalliance.aop.Advice;
23+
import org.jspecify.annotations.Nullable;
24+
25+
import org.springframework.amqp.core.MessagePostProcessor;
26+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
27+
import org.springframework.amqp.support.converter.MessageConverter;
28+
import org.springframework.integration.amqp.inbound.AmqpClientMessageProducer;
29+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
30+
import org.springframework.integration.dsl.MessageProducerSpec;
31+
import org.springframework.scheduling.TaskScheduler;
32+
33+
/**
34+
* Spec for an {@link AmqpClientMessageProducer}.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 7.0
39+
*/
40+
public class AmqpClientInboundChannelAdapterSpec
41+
extends MessageProducerSpec<AmqpClientInboundChannelAdapterSpec, AmqpClientMessageProducer> {
42+
43+
/**
44+
* Create an instance based on a {@link AmqpConnectionFactory} and queues to consume from.
45+
* @param connectionFactory the {@link AmqpConnectionFactory} to connect
46+
* @param queueNames queues to consume from
47+
*/
48+
public AmqpClientInboundChannelAdapterSpec(AmqpConnectionFactory connectionFactory, String... queueNames) {
49+
super(new AmqpClientMessageProducer(connectionFactory, queueNames));
50+
}
51+
52+
/**
53+
* The initial number credits to grant to the AMQP receiver.
54+
* The default is {@code 100}.
55+
* @param initialCredits number of initial credits
56+
* @return the spec
57+
*/
58+
public AmqpClientInboundChannelAdapterSpec initialCredits(int initialCredits) {
59+
this.target.setInitialCredits(initialCredits);
60+
return this;
61+
}
62+
63+
/**
64+
* The consumer priority.
65+
* @param priority consumer priority
66+
* @return the spec
67+
*/
68+
public AmqpClientInboundChannelAdapterSpec priority(int priority) {
69+
this.target.setPriority(priority);
70+
return this;
71+
}
72+
73+
/**
74+
* Add {@link Resource.StateListener} instances to the consumer.
75+
* @param stateListeners listeners to add
76+
* @return the spec
77+
*/
78+
public AmqpClientInboundChannelAdapterSpec stateListeners(Resource.StateListener... stateListeners) {
79+
this.target.setStateListeners(stateListeners);
80+
return this;
81+
}
82+
83+
/**
84+
* Add {@link MessagePostProcessor} instances to apply on just received messages.
85+
* @param afterReceivePostProcessors listeners to add
86+
* @return the spec
87+
*/
88+
public AmqpClientInboundChannelAdapterSpec afterReceivePostProcessors(
89+
MessagePostProcessor... afterReceivePostProcessors) {
90+
91+
this.target.setAfterReceivePostProcessors(afterReceivePostProcessors);
92+
return this;
93+
}
94+
95+
/**
96+
* Set a number of AMQP messages to gather before producing as a single message downstream.
97+
* Default 1 - no batching.
98+
* @param batchSize the batch size to use.
99+
* @return the spec
100+
* @see #batchReceiveTimeout(long)
101+
*/
102+
public AmqpClientInboundChannelAdapterSpec batchSize(int batchSize) {
103+
this.target.setBatchSize(batchSize);
104+
return this;
105+
}
106+
107+
/**
108+
* Set a timeout in milliseconds for how long a batch gathering process should go.
109+
* Therefore, the batch is released as a single message whatever first happens:
110+
* this timeout or {@link #batchSize(int)}.
111+
* Default 30 seconds.
112+
* @param batchReceiveTimeout the timeout for gathering a batch.
113+
* @return the spec
114+
*/
115+
public AmqpClientInboundChannelAdapterSpec batchReceiveTimeout(long batchReceiveTimeout) {
116+
this.target.setBatchReceiveTimeout(batchReceiveTimeout);
117+
return this;
118+
}
119+
120+
/**
121+
* Set a {@link TaskScheduler} for monitoring batch releases.
122+
* @param taskScheduler the taskScheduler to use
123+
* @return the spec
124+
*/
125+
public AmqpClientInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) {
126+
this.target.setTaskScheduler(taskScheduler);
127+
return this;
128+
}
129+
130+
/**
131+
* Set {@link Advice} instances to proxy message listener.
132+
* @param advices the advices to add
133+
* @return the spec
134+
*/
135+
public AmqpClientInboundChannelAdapterSpec adviceChain(Advice... advices) {
136+
this.target.setAdviceChain(advices);
137+
return this;
138+
}
139+
140+
/**
141+
* Set to {@code false} to propagate an acknowledgement callback into message headers
142+
* for downstream flow manual settlement.
143+
* @param autoSettle {@code true} to acknowledge messages automatically.
144+
* @return the spec
145+
*/
146+
public AmqpClientInboundChannelAdapterSpec autoSettle(boolean autoSettle) {
147+
this.target.setAutoSettle(autoSettle);
148+
return this;
149+
}
150+
151+
/**
152+
* Set the default behavior when a message processing has failed.
153+
* When true, messages will be requeued, when false, they will be discarded.
154+
* This option can be overruled by throwing
155+
* {@link org.springframework.amqp.AmqpRejectAndDontRequeueException} or
156+
* {@link org.springframework.amqp.ImmediateRequeueAmqpException} from the downstream flow.
157+
* Default true.
158+
* @param defaultRequeue true to requeue by default.
159+
* @return the spec
160+
*/
161+
public AmqpClientInboundChannelAdapterSpec defaultRequeue(boolean defaultRequeue) {
162+
this.target.setDefaultRequeue(defaultRequeue);
163+
return this;
164+
}
165+
166+
/**
167+
* Set a duration for how long to wait for all the consumers to shut down successfully on listener container stop.
168+
* Default 30 seconds.
169+
* @param gracefulShutdownPeriod the timeout to wait on stop.
170+
* @return the spec
171+
*/
172+
public AmqpClientInboundChannelAdapterSpec gracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
173+
this.target.setGracefulShutdownPeriod(gracefulShutdownPeriod);
174+
return this;
175+
}
176+
177+
/**
178+
* Each queue runs in its own consumer; set this property to create multiple
179+
* consumers for each queue.
180+
* Can be treated as {@code concurrency}, but per queue.
181+
* @param consumersPerQueue the consumers per queue.
182+
* @return the spec
183+
*/
184+
public AmqpClientInboundChannelAdapterSpec consumersPerQueue(int consumersPerQueue) {
185+
this.target.setConsumersPerQueue(consumersPerQueue);
186+
return this;
187+
}
188+
189+
/**
190+
* Set a {@link MessageConverter} to replace the default
191+
* {@link org.springframework.amqp.support.converter.SimpleMessageConverter}.
192+
* If set to null, an AMQP message is sent as is into a message payload.
193+
* And a reply message has to return an AMQP message as its payload.
194+
* @param messageConverter the {@link MessageConverter} to use or null.
195+
* @return the spec
196+
*/
197+
public AmqpClientInboundChannelAdapterSpec messageConverter(@Nullable MessageConverter messageConverter) {
198+
this.target.setMessageConverter(messageConverter);
199+
return this;
200+
}
201+
202+
/**
203+
* Set an {@link AmqpHeaderMapper} to map request headers.
204+
* @param headerMapper the {@link AmqpHeaderMapper} to use.
205+
* @return the spec
206+
*/
207+
public AmqpClientInboundChannelAdapterSpec headerMapper(AmqpHeaderMapper headerMapper) {
208+
this.target.setHeaderMapper(headerMapper);
209+
return this;
210+
}
211+
212+
}

0 commit comments

Comments
 (0)