Skip to content

Commit fb63e03

Browse files
authored
feat: resolve rabbit template dynamically by routing key (#1516)
* feat: enable rabbit template selection by routing key * feat: test fallback scenario and change order
1 parent a52fde5 commit fb63e03

File tree

2 files changed

+98
-3
lines changed

2 files changed

+98
-3
lines changed

springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/producer/SpringwolfAmqpProducer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,28 @@
1212
import org.springframework.util.CollectionUtils;
1313

1414
import java.util.List;
15+
import java.util.Map;
1516
import java.util.Optional;
17+
import java.util.stream.Collectors;
1618

1719
@Slf4j
1820
public class SpringwolfAmqpProducer {
1921

2022
private final AsyncApiService asyncApiService;
21-
private final Optional<RabbitTemplate> rabbitTemplate;
23+
private final Map<String, RabbitTemplate> rabbitTemplateRegistry;
2224

2325
public boolean isEnabled() {
24-
return rabbitTemplate.isPresent();
26+
return !rabbitTemplateRegistry.isEmpty();
2527
}
2628

2729
public SpringwolfAmqpProducer(AsyncApiService asyncApiService, List<RabbitTemplate> rabbitTemplates) {
2830
this.asyncApiService = asyncApiService;
29-
this.rabbitTemplate = rabbitTemplates.isEmpty() ? Optional.empty() : Optional.of(rabbitTemplates.get(0));
31+
this.rabbitTemplateRegistry = buildRabbitTemplateRegistry(rabbitTemplates);
32+
}
33+
34+
private Map<String, RabbitTemplate> buildRabbitTemplateRegistry(List<RabbitTemplate> templates) {
35+
return templates.stream()
36+
.collect(Collectors.toMap(RabbitTemplate::getRoutingKey, rabbitTemplate -> rabbitTemplate));
3037
}
3138

3239
public void send(String channelName, Object payload) {
@@ -44,6 +51,7 @@ public void send(String channelName, Object payload) {
4451
routingKey = channelName;
4552
}
4653

54+
Optional<RabbitTemplate> rabbitTemplate = getRabbitTemplate(routingKey);
4755
if (rabbitTemplate.isPresent()) {
4856
rabbitTemplate.get().convertAndSend(exchange, routingKey, payload);
4957
} else {
@@ -79,4 +87,9 @@ private String getRoutingKey(Operation operation) {
7987

8088
return routingKey;
8189
}
90+
91+
private Optional<RabbitTemplate> getRabbitTemplate(String routingKey) {
92+
return Optional.ofNullable(rabbitTemplateRegistry.get(routingKey))
93+
.or(() -> rabbitTemplateRegistry.values().stream().findFirst());
94+
}
8295
}

springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/producer/SpringwolfAmqpProducerTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import java.util.List;
2020
import java.util.Map;
2121

22+
import static org.mockito.ArgumentMatchers.anyString;
2223
import static org.mockito.ArgumentMatchers.eq;
2324
import static org.mockito.ArgumentMatchers.same;
2425
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
2527
import static org.mockito.Mockito.verify;
2628
import static org.mockito.Mockito.when;
2729

@@ -114,4 +116,84 @@ void send_exchangeAndRoutingKeyFromBindings() {
114116

115117
verify(rabbitTemplate).convertAndSend(eq("exchange-name"), eq("routing-key"), same(payload));
116118
}
119+
120+
@Test
121+
void send_usesRabbitTemplateMatchingRoutingKey() {
122+
RabbitTemplate template1 = mock(RabbitTemplate.class);
123+
RabbitTemplate template2 = mock(RabbitTemplate.class);
124+
when(template1.getRoutingKey()).thenReturn("any-key");
125+
when(template2.getRoutingKey()).thenReturn("routing-key");
126+
127+
springwolfAmqpProducer = new SpringwolfAmqpProducer(asyncApiService, List.of(template1, template2));
128+
129+
AMQPChannelExchangeProperties properties = new AMQPChannelExchangeProperties();
130+
properties.setName("exchange-name");
131+
ChannelObject channelItem = ChannelObject.builder()
132+
.bindings(Map.of(
133+
"amqp",
134+
AMQPChannelBinding.builder().exchange(properties).build()))
135+
.build();
136+
Map<String, ChannelObject> channels = Map.of("channel-name", channelItem);
137+
Operation operation = Operation.builder()
138+
.bindings(Map.of(
139+
"amqp",
140+
AMQPOperationBinding.builder()
141+
.cc(List.of("routing-key"))
142+
.build()))
143+
.build();
144+
Map<String, Operation> operations = Map.of("amqp", operation);
145+
146+
AsyncAPI asyncAPI = AsyncAPI.builder()
147+
.info(new Info())
148+
.channels(channels)
149+
.operations(operations)
150+
.build();
151+
when(asyncApiService.getAsyncAPI()).thenReturn(asyncAPI);
152+
153+
Map<String, Object> payload = new HashMap<>();
154+
springwolfAmqpProducer.send("channel-name", payload);
155+
156+
verify(template1, never()).convertAndSend(anyString(), anyString(), same(payload));
157+
verify(template2).convertAndSend(eq("exchange-name"), eq("routing-key"), same(payload));
158+
}
159+
160+
@Test
161+
void send_fallsBackToFirstTemplateWhenNoMatch() {
162+
RabbitTemplate template1 = mock(RabbitTemplate.class);
163+
RabbitTemplate template2 = mock(RabbitTemplate.class);
164+
when(template1.getRoutingKey()).thenReturn("key-1");
165+
when(template2.getRoutingKey()).thenReturn("key-2");
166+
167+
springwolfAmqpProducer = new SpringwolfAmqpProducer(asyncApiService, List.of(template1, template2));
168+
169+
AMQPChannelExchangeProperties properties = new AMQPChannelExchangeProperties();
170+
properties.setName("exchange-name");
171+
ChannelObject channelItem = ChannelObject.builder()
172+
.bindings(Map.of(
173+
"amqp",
174+
AMQPChannelBinding.builder().exchange(properties).build()))
175+
.build();
176+
Map<String, ChannelObject> channels = Map.of("channel-name", channelItem);
177+
Operation operation = Operation.builder()
178+
.bindings(Map.of(
179+
"amqp",
180+
AMQPOperationBinding.builder()
181+
.cc(List.of("routing-key"))
182+
.build()))
183+
.build();
184+
Map<String, Operation> operations = Map.of("amqp", operation);
185+
186+
AsyncAPI asyncAPI = AsyncAPI.builder()
187+
.info(new Info())
188+
.channels(channels)
189+
.operations(operations)
190+
.build();
191+
when(asyncApiService.getAsyncAPI()).thenReturn(asyncAPI);
192+
193+
Map<String, Object> payload = new HashMap<>();
194+
springwolfAmqpProducer.send("channel-name", payload);
195+
196+
verify(template1).convertAndSend(eq("exchange-name"), eq("routing-key"), same(payload));
197+
verify(template2, never()).convertAndSend(anyString(), anyString(), same(payload));
198+
}
117199
}

0 commit comments

Comments
 (0)