Skip to content

Commit ee98762

Browse files
authored
Merge pull request #29 from meysamhadeli/refactor/refactor-rabbitmq-listener-configuration
refactor: refactor rabbitmq listener configuration
2 parents 5f77401 + 2042354 commit ee98762

File tree

10 files changed

+158
-133
lines changed

10 files changed

+158
-133
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
- :sparkle: Using `OpenTelemetry Collector` for collecting `Metrics`, `Tracings` and `Structured Logs`.
5555
- :sparkle: Using `Kibana` for `Logging` top of `OpenTelemetry Collector`.
5656
- :sparkle: Using `Jaeger` for `Distributed Tracing` top of `OpenTelemetry Collector`.
57-
- :sparkle: Using `OpenTelemetry` for monitoring on top of `Prometheus` and `Grafana`.
58-
- :sparkle: Using `Keycloak` for authentication and authorization base on `OpenID-Connect` and `OAuth2`.
57+
- :sparkle: Using `Prometheus` and `Grafana` for `monitoring` top of `OpenTelemetry Collector`.
58+
- :sparkle: Using `Keycloak` for `authentication` and `authorization` base on `OpenID-Connect` and `OAuth2`.
5959
- :sparkle: Using `Spring Cloud Gateway` as a microservices `gateway`.
6060

6161

booking.rest

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ POST {{keycloak-api}}/realms/keycloak-realm/protocol/openid-connect/token
2525
Content-Type: application/x-www-form-urlencoded
2626

2727
grant_type=client_credentials
28-
&client_id=flight-client-credentials
29-
&client_secret=Glcz8z8E6alTIfay1GdMc6XTTeuZrgOs
30-
&scope=openid
28+
&client_id=booking-client-credentials
29+
&client_secret=secret
30+
&scope=openid
3131
###
3232

3333

src/buildingblocks/src/main/java/buildingblocks/keycloak/CustomAuthenticationEntryPoint.java

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import buildingblocks.utils.jsonconverter.JsonConverterUtils;
44
import jakarta.servlet.http.HttpServletRequest;
55
import jakarta.servlet.http.HttpServletResponse;
6+
import org.slf4j.Logger;
67
import org.springframework.http.HttpStatus;
78
import org.springframework.http.MediaType;
89
import org.springframework.http.ProblemDetail;
@@ -19,30 +20,39 @@
1920
@Component
2021
public class CustomAuthenticationEntryPoint implements AuthenticationEntryPoint {
2122

22-
@Override
23-
public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException)
24-
throws IOException {
25-
26-
ProblemDetail problemDetail = ProblemDetail.forStatusAndDetail(
27-
getStatus(authException),
28-
authException.getMessage()
29-
);
30-
31-
problemDetail.setTitle(authException.getClass().getSimpleName());
32-
problemDetail.setDetail(authException.getMessage());
33-
problemDetail.setType(URI.create("https://problems/"+ authException.getClass().getSimpleName().toLowerCase()));
34-
problemDetail.setProperty("timestamp", Instant.now().toString());
35-
problemDetail.setInstance(URI.create(request.getRequestURI()));
36-
response.setStatus(getStatus(authException).value());
37-
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
38-
response.getWriter().write(JsonConverterUtils.serializeObject(problemDetail));
39-
}
40-
41-
private HttpStatus getStatus(AuthenticationException exception) {
42-
return switch (exception) {
43-
case BadCredentialsException e -> HttpStatus.UNAUTHORIZED;
44-
case InsufficientAuthenticationException e -> HttpStatus.FORBIDDEN;
45-
default -> HttpStatus.UNAUTHORIZED;
46-
};
47-
}
48-
}
23+
private final Logger logger;
24+
25+
public CustomAuthenticationEntryPoint(Logger logger) {
26+
this.logger = logger;
27+
}
28+
29+
@Override
30+
public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException)
31+
throws IOException {
32+
33+
ProblemDetail problemDetail = ProblemDetail.forStatusAndDetail(
34+
getStatus(authException),
35+
authException.getMessage()
36+
);
37+
38+
problemDetail.setTitle(authException.getClass().getSimpleName());
39+
problemDetail.setDetail(authException.getMessage());
40+
problemDetail.setType(URI.create("https://problems/" + authException.getClass().getSimpleName().toLowerCase()));
41+
problemDetail.setProperty("timestamp", Instant.now().toString());
42+
problemDetail.setInstance(URI.create(request.getRequestURI()));
43+
response.setStatus(getStatus(authException).value());
44+
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
45+
response.getWriter().write(JsonConverterUtils.serializeObject(problemDetail));
46+
47+
// Log structured error details
48+
logger.atError().addKeyValue("details", JsonConverterUtils.serializeObject(problemDetail)).log("An error occurred while processing the request.");
49+
}
50+
51+
private HttpStatus getStatus(AuthenticationException exception) {
52+
return switch (exception) {
53+
case BadCredentialsException e -> HttpStatus.UNAUTHORIZED;
54+
case InsufficientAuthenticationException e -> HttpStatus.FORBIDDEN;
55+
default -> HttpStatus.UNAUTHORIZED;
56+
};
57+
}
58+
}

src/buildingblocks/src/main/java/buildingblocks/keycloak/KeycloakConfiguration.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package buildingblocks.keycloak;
22

3+
import org.slf4j.Logger;
34
import org.springframework.beans.factory.annotation.Value;
45
import org.springframework.context.annotation.Bean;
56
import org.springframework.context.annotation.Configuration;
@@ -26,10 +27,10 @@ public class KeycloakConfiguration {
2627
private String jwkSetUri;
2728

2829
@Bean
29-
public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws Exception {
30+
public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity, Logger logger) throws Exception {
3031
return httpSecurity
3132
.exceptionHandling(exception -> {
32-
exception.authenticationEntryPoint(new CustomAuthenticationEntryPoint());
33+
exception.authenticationEntryPoint(new CustomAuthenticationEntryPoint(logger));
3334
})
3435
.cors(Customizer.withDefaults())
3536
.csrf(csrf -> csrf.disable())
@@ -62,4 +63,4 @@ private JwtAuthenticationConverter jwtAuthenticationConverter() {
6263
converter.setJwtGrantedAuthoritiesConverter(combinedConverter);
6364
return converter;
6465
}
65-
}
66+
}

src/buildingblocks/src/main/java/buildingblocks/problemdetails/CustomProblemDetailsHandler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
import buildingblocks.core.exception.ConflictException;
55
import buildingblocks.core.exception.NotFoundException;
66
import buildingblocks.core.exception.ValidationException;
7+
import buildingblocks.utils.jsonconverter.JsonConverterUtils;
78
import jakarta.persistence.OptimisticLockException;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import org.slf4j.event.KeyValuePair;
812
import org.springframework.core.env.Environment;
913
import org.springframework.http.HttpStatus;
1014
import org.springframework.http.ProblemDetail;
@@ -16,11 +20,14 @@
1620

1721
import java.net.URI;
1822
import java.time.Instant;
23+
import java.util.Map;
24+
import java.util.Objects;
1925

2026
@RestControllerAdvice
2127
public class CustomProblemDetailsHandler extends ResponseEntityExceptionHandler {
2228

2329
private final Environment environment;
30+
private final Logger logger = LoggerFactory.getLogger(CustomProblemDetailsHandler.class);
2431

2532
public CustomProblemDetailsHandler(Environment environment) {
2633
this.environment = environment;
@@ -35,7 +42,6 @@ record ExceptionDetails(String detail, String title, HttpStatus status) {
3542
}
3643

3744
ExceptionDetails details = switch (ex) {
38-
3945
case ConflictException conflictEx -> new ExceptionDetails(
4046
conflictEx.getMessage(),
4147
conflictEx.getClass().getSimpleName(),
@@ -84,7 +90,6 @@ record ExceptionDetails(String detail, String title, HttpStatus status) {
8490
details.detail()
8591
);
8692

87-
8893
problemDetail.setType(URI.create("https://problems/" + details.title().toLowerCase()));
8994
problemDetail.setTitle(details.title());
9095
problemDetail.setProperty("timestamp", Instant.now());
@@ -94,6 +99,9 @@ record ExceptionDetails(String detail, String title, HttpStatus status) {
9499
problemDetail.setProperty("exception", ex.toString());
95100
}
96101

102+
// Log structured error details
103+
logger.atError().addKeyValue("details", JsonConverterUtils.serializeObject(details)).log("An error occurred while processing the request.");
104+
97105
return ResponseEntity.status(problemDetail.getStatus()).body(problemDetail);
98106
}
99107

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package buildingblocks.rabbitmq;
2+
3+
import buildingblocks.utils.jsonconverter.JsonConverterUtils;
4+
import buildingblocks.utils.reflection.ReflectionUtils;
5+
import org.springframework.amqp.core.Message;
6+
import org.springframework.amqp.core.MessageListener;
7+
8+
public interface MessageHandler<T> extends MessageListener {
9+
/**
10+
* Handles the message of the specified type.
11+
*/
12+
void onMessage(T message);
13+
14+
/**
15+
* Default implementation of onMessage to delegate to the consume method.
16+
*/
17+
@Override
18+
default void onMessage(Message message) {
19+
try {
20+
Class<T> messageType = ReflectionUtils.getGenericTypeParameter(this.getClass(), MessageHandler.class);
21+
T deserializedMessage = JsonConverterUtils.deserialize(message.getBody(), messageType);
22+
onMessage(deserializedMessage);
23+
} catch (Exception ex) {
24+
throw new RuntimeException("Failed to process message", ex);
25+
}
26+
}
27+
}

src/buildingblocks/src/main/java/buildingblocks/rabbitmq/RabbitmqConfiguration.java

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -77,58 +77,58 @@ public MessageListenerContainer addListeners(
7777
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
7878

7979
// Map to store message type to listener mapping
80-
HashMap<Class<?>, Supplier<MessageListener>> listenerMap = new HashMap<>();
80+
HashMap<Class<?>, Supplier<MessageHandler<?>>> listenerMap = new HashMap<>();
8181

82-
// Retrieve all beans of type MessageListener
83-
Map<String, MessageListener> listeners = applicationContext.getBeansOfType(MessageListener.class);
82+
// Retrieve all beans of type MessageHandler
83+
Map<String, MessageHandler> listeners = applicationContext.getBeansOfType(MessageHandler.class);
8484

8585
listeners.values().forEach(listener -> {
86-
RabbitmqMessageHandler annotation = listener.getClass().getAnnotation(RabbitmqMessageHandler.class);
86+
// Infer the message type from the listener instance
87+
Class<?> messageType = ReflectionUtils.getGenericTypeParameter(listener.getClass(), MessageHandler.class);
88+
String typeName = messageType.getTypeName();
8789

88-
if (annotation != null) {
89-
var typeName = annotation.messageType().getTypeName();
90+
// Declare queue, exchange, and bindings
91+
Queue queue = declareQueue(rabbitAdmin, typeName);
92+
Exchange exchange = declareExchange(rabbitAdmin);
93+
declareBindings(rabbitAdmin, queue, exchange, typeName);
9094

91-
Queue queue = declareQueue(rabbitAdmin, typeName);
95+
// Add the queue to the container
96+
container.setQueueNames(queue.getName());
9297

93-
Exchange exchange = declareExchange(rabbitAdmin);
94-
95-
declareBindings(rabbitAdmin, queue, exchange, typeName);
96-
97-
container.setQueueNames(queue.getName());
98-
99-
// Store the listener in the map
100-
listenerMap.put(annotation.messageType(), () -> listener);
101-
}
102-
103-
// Set the message listener
104-
container.setMessageListener(message -> {
105-
transactionTemplate.execute(status -> {
106-
try {
107-
UUID id = persistMessageProcessor.addReceivedMessage(message);
108-
PersistMessageEntity persistMessage = persistMessageProcessor.existInboxMessage(id);
109-
110-
if (persistMessage == null) {
111-
Class<?> messageType = ReflectionUtils.findClassFromName(message.getMessageProperties().getType());
112-
113-
// Find the appropriate handler
114-
Supplier<MessageListener> handlerSupplier = listenerMap.get(messageType);
98+
// Store the listener in the map
99+
listenerMap.put(messageType, () -> listener);
100+
});
115101

116-
if (handlerSupplier != null) {
117-
MessageListener handler = handlerSupplier.get();
118-
handler.onMessage(message);
119-
persistMessageProcessor.process(id, MessageDeliveryType.Inbox);
120-
} else {
121-
logger.warn("No handler found for message type: {}", messageType.getTypeName());
122-
}
102+
// Set the message listener
103+
container.setMessageListener(message -> {
104+
transactionTemplate.execute(status -> {
105+
try {
106+
UUID id = persistMessageProcessor.addReceivedMessage(message);
107+
PersistMessageEntity persistMessage = persistMessageProcessor.existInboxMessage(id);
108+
109+
if (persistMessage == null) {
110+
// Infer the message type from the message properties
111+
Class<?> messageType = ReflectionUtils.findClassFromName(message.getMessageProperties().getType());
112+
113+
// Find the appropriate handler
114+
Supplier<MessageHandler<?>> handlerSupplier = listenerMap.get(messageType);
115+
116+
if (handlerSupplier != null) {
117+
MessageHandler<?> handler = handlerSupplier.get();
118+
handler.onMessage(message); // This will delegate to the consume method
119+
persistMessageProcessor.process(id, MessageDeliveryType.Inbox);
120+
} else {
121+
logger.warn("No handler found for message type: {}", messageType.getTypeName());
123122
}
124-
} catch (Exception ex) {
125-
status.setRollbackOnly();
126-
throw ex;
127123
}
128-
return null;
129-
});
124+
} catch (Exception ex) {
125+
status.setRollbackOnly();
126+
throw ex;
127+
}
128+
return null;
130129
});
131130
});
131+
132132
return container;
133133
}
134134

src/buildingblocks/src/main/java/buildingblocks/rabbitmq/RabbitmqMessageHandler.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

0 commit comments

Comments
 (0)