Skip to content

Commit b41268b

Browse files
committed
refactor: refactor rabbitmq infrastructure
1 parent 4432b14 commit b41268b

File tree

18 files changed

+152
-142
lines changed

18 files changed

+152
-142
lines changed

booking.rest

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
@api-gateway=http://localhost:8081
2+
@api-gateway=http://localhost:8081
33
@keycloak-api=http://localhost:8080
44
@flight-api=http://localhost:8082
55
@passenger-api=http://localhost:8083
@@ -25,9 +25,9 @@ POST {{keycloak-api}}/realms/keycloak-realm/protocol/openid-connect/token
2525
Content-Type: application/x-www-form-urlencoded
2626

2727
grant_type=client_credentials
28-
&client_id=flight-client-credentials
29-
&client_secret=Glcz8z8E6alTIfay1GdMc6XTTeuZrgOs
30-
&scope=openid
28+
&client_id=flight-client-credentials
29+
&client_secret=Glcz8z8E6alTIfay1GdMc6XTTeuZrgOs
30+
&scope=openid
3131
###
3232

3333

@@ -97,13 +97,13 @@ authorization: bearer {{Authenticate.response.body.access_token}}
9797

9898
###
9999
# @name Create_Flights
100-
POST {{flight-api}}/api/v1/flight
100+
POST {{api-gateway}}/api/v1/flight
101101
accept: application/json
102102
Content-Type: application/json
103103
authorization: bearer {{Authenticate.response.body.access_token}}
104104

105105
{
106-
"flightNumber": "3dd5555655533",
106+
"flightNumber": "20BB",
107107
"aircraftId": {{aircraftId}},
108108
"departureAirportId": {{departureAirportId}},
109109
"departureDate": "2022-03-01T14:55:41.255Z",
@@ -119,13 +119,13 @@ authorization: bearer {{Authenticate.response.body.access_token}}
119119

120120
###
121121
# @name Update_Flights
122-
PUT {{flight-api}}/api/v1/flight/019498d8-2a28-7f0b-ac0c-2f8352c13c79
122+
PUT {{api-gateway}}/api/v1/flight/01949849-1608-7e16-975a-e7f4cf1d029d
123123
accept: application/json
124124
Content-Type: application/json
125125
authorization: bearer {{Authenticate.response.body.access_token}}
126126

127127
{
128-
"flightNumber": "2222222222222222",
128+
"flightNumber": "20BB",
129129
"aircraftId": {{aircraftId}},
130130
"departureAirportId": {{departureAirportId}},
131131
"departureDate": "2025-01-24T12:35:11.803Z",
@@ -134,7 +134,7 @@ authorization: bearer {{Authenticate.response.body.access_token}}
134134
"durationMinutes": 140,
135135
"flightDate": "2025-01-24T12:35:11.803Z",
136136
"status": "Flying",
137-
"price": 9000,
137+
"price": 8000,
138138
"isDeleted": false
139139
}
140140
###

src/buildingblocks/src/main/java/buildingblocks/core/event/EventDispatcherConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.hibernate.engine.spi.PersistenceContext;
66
import org.hibernate.event.spi.PersistContext;
77
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
8+
import org.springframework.context.ApplicationContext;
89
import org.springframework.context.annotation.Bean;
910
import org.springframework.context.annotation.Configuration;
1011

@@ -13,7 +14,7 @@ public class EventDispatcherConfiguration {
1314

1415
@Bean
1516
@ConditionalOnMissingClass
16-
public EventDispatcher eventDispatcher(EventMapper eventMapper, PersistMessageProcessor persistMessageProcessor) {
17-
return new EventDispatcherImpl(eventMapper, persistMessageProcessor);
17+
public EventDispatcher eventDispatcher(EventMapper eventMapper, PersistMessageProcessor persistMessageProcessor, ApplicationContext applicationContext) {
18+
return new EventDispatcherImpl(eventMapper, persistMessageProcessor, applicationContext);
1819
}
1920
}

src/buildingblocks/src/main/java/buildingblocks/core/event/EventDispatcherImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.reflections.Reflections;
88
import org.reflections.scanners.SubTypesScanner;
99
import org.reflections.util.ConfigurationBuilder;
10+
import org.springframework.context.ApplicationContext;
1011
import org.springframework.context.annotation.Import;
1112
import org.springframework.stereotype.Component;
1213

@@ -22,11 +23,13 @@
2223
public class EventDispatcherImpl implements EventDispatcher {
2324
private final EventMapper eventMapper;
2425
private final PersistMessageProcessor persistMessageProcessor;
26+
private final ApplicationContext applicationContext;
2527

26-
public EventDispatcherImpl(EventMapper eventMapper, PersistMessageProcessor persistMessageProcessor) {
28+
public EventDispatcherImpl(EventMapper eventMapper, PersistMessageProcessor persistMessageProcessor, ApplicationContext applicationContext) {
2729
this.eventMapper = eventMapper;
2830
this.persistMessageProcessor = persistMessageProcessor;
29-
}
31+
this.applicationContext = applicationContext;
32+
}
3033

3134
@Override
3235
public <T extends DomainEvent> void send(List<T> domainEvents, Class<?> eventType) {
@@ -42,13 +45,13 @@ public <T extends DomainEvent> void send(List<T> domainEvents, Class<?> eventTyp
4245

4346
@Override
4447
public List<DomainEvent> getDomainEvents() {
45-
AggregateRoot<?> aggregateRoot = ReflectionUtils.getInstanceOfSubclass(AggregateRoot.class);
48+
AggregateRoot<?> aggregateRoot = ReflectionUtils.getInstanceOfSubclass(AggregateRoot.class, applicationContext);
4649
return Objects.requireNonNull(aggregateRoot).getDomainEvents();
4750
}
4851

4952
@Override
5053
public void clearDomainEvents() {
51-
AggregateRoot<?> aggregateRoot = ReflectionUtils.getInstanceOfSubclass(AggregateRoot.class);
54+
AggregateRoot<?> aggregateRoot = ReflectionUtils.getInstanceOfSubclass(AggregateRoot.class, applicationContext);
5255
Objects.requireNonNull(aggregateRoot).clearDomainEvents();
5356
}
5457
}

src/buildingblocks/src/main/java/buildingblocks/outboxprocessor/PersistMessageProcessorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ private boolean processOutbox(PersistMessageEntity message) {
152152
if (data instanceof IntegrationEvent integrationEvent) {
153153

154154
this.rabbitTemplate.convertSendAndReceive(
155-
message.getDataType(),
155+
rabbitProperties.getTemplate().getExchange(),
156156
message.getDataType(),
157157
JsonConverterUtils.serializeObject(integrationEvent), msg -> {
158158
MessageProperties props = msg.getMessageProperties();

src/buildingblocks/src/main/java/buildingblocks/rabbitmq/RabbitmqConfiguration.java

Lines changed: 47 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
1616
import org.springframework.beans.factory.annotation.Value;
1717
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
18+
import org.springframework.context.ApplicationContext;
1819
import org.springframework.context.annotation.*;
1920
import org.springframework.transaction.PlatformTransactionManager;
2021
import org.springframework.transaction.support.TransactionTemplate;
21-
2222
import java.util.HashMap;
23+
import java.util.Map;
2324
import java.util.UUID;
2425
import java.util.function.Supplier;
2526

@@ -42,7 +43,6 @@ public RabbitmqConfiguration(
4243
this.logger = logger;
4344
}
4445

45-
4646
@Bean
4747
public ConnectionFactory connectionFactory() {
4848
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost());
@@ -54,39 +54,9 @@ public ConnectionFactory connectionFactory() {
5454

5555
@Bean
5656
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
57-
return new RabbitAdmin(connectionFactory);
57+
return new RabbitAdmin(connectionFactory);
5858
}
5959

60-
// public String getQueueName() {
61-
// return rabbitProperties.getTemplate().getExchange() + "_queue";
62-
// }
63-
64-
// @Bean
65-
// public Queue queue() {
66-
// return new Queue(rabbitProperties.getTemplate().getExchange() + "_queue", true);
67-
// }
68-
//
69-
// @Bean
70-
// public Exchange exchange() {
71-
// return switch (exchangeType.toLowerCase()) {
72-
// case "direct" -> new DirectExchange(rabbitProperties.getTemplate().getExchange());
73-
// case "fanout" -> new FanoutExchange(rabbitProperties.getTemplate().getExchange());
74-
// default -> new TopicExchange(rabbitProperties.getTemplate().getExchange());
75-
// };
76-
// }
77-
//
78-
//
79-
// @Bean
80-
// public Binding binding(Queue queue, Exchange exchange) {
81-
// String routingKey = rabbitProperties.getTemplate().getExchange() + "_routing_key";
82-
// return switch (exchange) {
83-
// case TopicExchange topicExchange -> BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
84-
// case DirectExchange directExchange -> BindingBuilder.bind(queue).to(directExchange).with(routingKey);
85-
// case FanoutExchange fanoutExchange -> BindingBuilder.bind(queue).to(fanoutExchange);
86-
// case null, default -> throw new IllegalArgumentException("Unsupported exchange type for binding");
87-
// };
88-
// }
89-
9060
@Bean
9161
public AsyncRabbitTemplate asyncTemplate(ConnectionFactory connectionFactory) {
9262
return new AsyncRabbitTemplate(new RabbitTemplate(connectionFactory));
@@ -98,38 +68,35 @@ public RabbitTemplate template(ConnectionFactory connectionFactory) {
9868
}
9969

10070
@Bean
101-
public MessageListenerContainer addListeners(PersistMessageProcessor persistMessageProcessor, RabbitAdmin rabbitAdmin) {
71+
public MessageListenerContainer addListeners(
72+
PersistMessageProcessor persistMessageProcessor,
73+
RabbitAdmin rabbitAdmin,
74+
ApplicationContext applicationContext) {
10275
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
10376
container.setConnectionFactory(connectionFactory());
104-
105-
// // Change to support multiple listeners
106-
// List<MessageListener> listeners = ReflectionUtils.getAllInstanceOfSubclasses(MessageListener.class);
107-
//
77+
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
10878

10979
// Map to store message type to listener mapping
11080
HashMap<Class<?>, Supplier<MessageListener>> listenerMap = new HashMap<>();
11181

112-
// Find all MessageListener implementations
113-
var listener = ReflectionUtils.getInstanceOfSubclass(MessageListener.class);
114-
if (listener != null) {
82+
// Retrieve all beans of type MessageListener
83+
Map<String, MessageListener> listeners = applicationContext.getBeansOfType(MessageListener.class);
84+
85+
listeners.values().forEach(listener -> {
11586
RabbitmqMessageHandler annotation = listener.getClass().getAnnotation(RabbitmqMessageHandler.class);
11687

11788
if (annotation != null) {
11889
var typeName = annotation.messageType().getTypeName();
11990

120-
DirectExchange directExchange = new DirectExchange(typeName);
121-
rabbitAdmin.declareExchange(directExchange);
91+
Queue queue = declareQueue(rabbitAdmin, typeName);
12292

123-
Queue queue = new Queue(typeName + "_queue", true);
124-
rabbitAdmin.declareQueue(queue);
93+
Exchange exchange = declareExchange(rabbitAdmin);
12594

126-
// Bind the queue to the fanout exchange
127-
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(typeName));
95+
declareBindings(rabbitAdmin, queue, exchange, typeName);
12896

129-
// Set the queue name from the annotation
13097
container.setQueueNames(queue.getName());
13198

132-
// Store a supplier for the listener
99+
// Store the listener in the map
133100
listenerMap.put(annotation.messageType(), () -> listener);
134101
}
135102

@@ -141,7 +108,6 @@ public MessageListenerContainer addListeners(PersistMessageProcessor persistMess
141108
PersistMessageEntity persistMessage = persistMessageProcessor.existInboxMessage(id);
142109

143110
if (persistMessage == null) {
144-
145111
Class<?> messageType = ReflectionUtils.findClassFromName(message.getMessageProperties().getType());
146112

147113
// Find the appropriate handler
@@ -162,7 +128,37 @@ public MessageListenerContainer addListeners(PersistMessageProcessor persistMess
162128
return null;
163129
});
164130
});
165-
}
131+
});
166132
return container;
167133
}
134+
135+
private static void declareBindings(RabbitAdmin rabbitAdmin, Queue queue, Exchange exchange, String routingKey) {
136+
switch (exchange) {
137+
case TopicExchange topicExchange -> rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(routingKey));
138+
case DirectExchange directExchange -> rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(routingKey));
139+
case FanoutExchange fanoutExchange -> rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
140+
case null, default -> throw new IllegalArgumentException("Unsupported exchange type for binding");
141+
};
142+
}
143+
144+
private Exchange declareExchange(RabbitAdmin rabbitAdmin) {
145+
Exchange exchange = switch (exchangeType.toLowerCase()) {
146+
case "direct" ->
147+
new DirectExchange(rabbitProperties.getTemplate().getExchange());
148+
case "fanout" ->
149+
new FanoutExchange(rabbitProperties.getTemplate().getExchange());
150+
default ->
151+
new TopicExchange(rabbitProperties.getTemplate().getExchange());
152+
};
153+
154+
rabbitAdmin.declareExchange(exchange);
155+
156+
return exchange;
157+
}
158+
159+
private static Queue declareQueue(RabbitAdmin rabbitAdmin, String typeName) {
160+
Queue queue = new Queue(typeName + "_queue", true);
161+
rabbitAdmin.declareQueue(queue);
162+
return queue;
163+
}
168164
}

src/buildingblocks/src/main/java/buildingblocks/rabbitmq/RabbitmqMessageHandler.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,4 @@
1313
* @return the message type class
1414
*/
1515
Class<?> messageType();
16-
17-
/**
18-
* Specifies the queue name for this message handler.
19-
* @return the queue name
20-
*/
21-
String queueName();
2216
}

src/buildingblocks/src/main/java/buildingblocks/utils/reflection/ReflectionUtils.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@
33
import org.reflections.Reflections;
44
import org.reflections.scanners.SubTypesScanner;
55
import org.reflections.util.ConfigurationBuilder;
6+
import org.springframework.context.ApplicationContext;
7+
68
import java.lang.reflect.Constructor;
7-
import java.lang.reflect.InvocationTargetException;
89
import java.util.ArrayList;
9-
import java.util.Iterator;
1010
import java.util.List;
1111
import java.util.Set;
1212

1313
public final class ReflectionUtils {
14-
public static <T> T getInstanceOfSubclass(Class<T> abstractClass) {
14+
public static <T> T getInstanceOfSubclass(Class<T> abstractClass, ApplicationContext applicationContext) {
1515
try {
16-
// Initialize Reflections with the provided package name
1716
Reflections reflections = new Reflections(new ConfigurationBuilder()
1817
.forPackages("")
1918
.setScanners(new SubTypesScanner(false))
@@ -23,49 +22,53 @@ public static <T> T getInstanceOfSubclass(Class<T> abstractClass) {
2322
Set<Class<? extends T>> subTypes = reflections.getSubTypesOf(abstractClass);
2423

2524
if (!subTypes.isEmpty()) {
26-
// Get the first subclass (you can enhance this to return any specific subclass if needed)
27-
Class<? extends T> firstSubclass = subTypes.iterator().next();
28-
29-
// Create an instance of the subclass using reflection
30-
Constructor<? extends T> constructor = firstSubclass.getDeclaredConstructor();
31-
return constructor.newInstance();
25+
for (Class<? extends T> subType : subTypes) {
26+
// Check if the subclass is managed by Spring
27+
if (applicationContext.containsBean(subType.getSimpleName())) {
28+
// Return the Spring-managed bean
29+
return applicationContext.getBean(subType);
30+
} else {
31+
// Fall back to creating a new instance manually
32+
Constructor<? extends T> constructor = subType.getDeclaredConstructor();
33+
return constructor.newInstance();
34+
}
35+
}
3236
}
3337
return null; // or throw an exception if no subclass is found
3438
} catch (Exception ex) {
3539
throw new RuntimeException("Error occurred while creating an instance of subclass", ex);
3640
}
3741
}
3842

39-
public static <T> List<T> getAllInstanceOfSubclasses(Class<T> abstractClass) {
43+
public static <T> List<T> getAllInstanceOfSubclasses(Class<T> abstractClass, ApplicationContext applicationContext) {
4044
List<T> instances = new ArrayList<>();
4145
try {
42-
// Initialize Reflections with the provided package name
4346
Reflections reflections = new Reflections(new ConfigurationBuilder()
4447
.forPackages("")
4548
.setScanners(new SubTypesScanner(false))
4649
);
4750

48-
// Get all subclasses of the abstract class
4951
Set<Class<? extends T>> subTypes = reflections.getSubTypesOf(abstractClass);
5052

51-
if (!subTypes.isEmpty()) {
52-
// Get the first subclass (you can enhance this to return any specific subclass if needed)
53-
54-
for (Class<? extends T> subType : subTypes) {
55-
try {
56-
// Create a new instance using the no-argument constructor
53+
for (Class<? extends T> subType : subTypes) {
54+
try {
55+
// Check if the class is managed by Spring
56+
if (applicationContext.containsBean(subType.getSimpleName())) {
57+
T instance = applicationContext.getBean(subType);
58+
instances.add(instance);
59+
} else {
60+
// Fall back to no-argument constructor
5761
T instance = subType.getDeclaredConstructor().newInstance();
5862
instances.add(instance);
59-
} catch (InstantiationException | IllegalAccessException |
60-
NoSuchMethodException | InvocationTargetException ex) {
61-
throw new RuntimeException("Error occurred while creating an instance of subclasses", ex);
6263
}
64+
} catch (Exception ex) {
65+
throw new RuntimeException("Error occurred while creating an instance of subclasses", ex);
6366
}
6467
}
65-
return instances;
6668
} catch (Exception ex) {
6769
throw new RuntimeException("Error occurred while creating an instance of subclass", ex);
6870
}
71+
return instances;
6972
}
7073

7174
public static Class<?> findClassFromName(String className) {

0 commit comments

Comments
 (0)