Skip to content

Commit bdd0c44

Browse files
ravicm2maihackehohwille
authored
JWT Authentication support for devon4j-kafka (#264)
* initial commit * fix formatting * Move kafka-jwt-auth to devon4j-security-jwt * Change order of logging and token validation * removed * Fix annotation packagename * Acknowledge msg with invalid token * Include auto config for jwt auth aspect for kafka * added loggers for retry * log message key instead of contents * changed from debug to info * aspect junits * wip-integration tests * initial commit * fix formatting * Move kafka-jwt-auth to devon4j-security-jwt * Change order of logging and token validation * removed * Fix annotation packagename * Acknowledge msg with invalid token * Include auto config for jwt auth aspect for kafka * added loggers for retry * log message key instead of contents * changed from debug to info * aspect junits * wip-integration tests * wip integration test * finished integrated test and added resources * missing test case and javadocs * remove prefix from token * added access control confog. build fix * fix indentation * Add documentation * improve doc Co-authored-by: Simon Spielmann <[email protected]> Co-authored-by: Simon Spielmann <[email protected]> Co-authored-by: Jörg Hohwiller <[email protected]>
1 parent c7509e2 commit bdd0c44

File tree

32 files changed

+747
-45
lines changed

32 files changed

+747
-45
lines changed

documentation/guide-jwt.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,7 @@ To allow a client to login with username and password to get a JWT for sub-seque
103103
http.addFilterBefore(getJwtLoginFilter(), UsernamePasswordAuthenticationFilter.class);
104104
}
105105
----
106+
107+
== Authentication with Kafka
108+
109+
Authentication with JWT and Kafka is explained in the link:guide-kafka.asciidoc[Kafka guide].

documentation/guide-kafka.asciidoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,37 @@ messaging.kafka.health.topicsToCheck=employeeapp-employee-v1-delete,employeeapp-
314314

315315
These properties are provided with default values except the topicsToCheck and health check will do happen only when the property is `management.endpoint.health.enabled=true`.
316316

317+
== Authentication
318+
319+
=== JSON Web Token (JWT)
320+
321+
devon4j-kafka supports authentication via JSON Web Tokens (JWT) out-of-the-box.
322+
To use it add a dependency to the devon4j-starter-security-jwt:
323+
324+
[source,xml]
325+
-----
326+
<dependency>
327+
<groupId>com.devonfw.java.starters</groupId>
328+
<artifactId>devon4j-starter-security-jwt</artifactId>
329+
</dependency>
330+
-----
331+
332+
The authentication via JWT needs some configuration, e.g. a keystore to verify the token signature. This is explained in the link:guide-jwt.asciidoc[JWT documentation].
333+
334+
To secure a message listener with jwt add the `@JwtAuthentication`:
335+
336+
[source,java]
337+
-----
338+
@JwtAuthentication
339+
@KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}")
340+
public void consumer(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
341+
...
342+
}
343+
}
344+
-----
345+
346+
With this annotation in-place each message will be checked for a valid JWT in a message header with the name `Authorization`. If a valid annotation is found the spring security context will be initialized with the user roles and "normal" authorization e.g. with `@RolesAllowed` may be used. This is also demonstrated in the kafka sample application.
347+
317348
== Using Kafka for internal parallel processing
318349
Apart from the use of Kafka as "communication channel" it sometimes helpful to use Kafka internally to do parallel processing:
319350

modules/kafka/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
<artifactId>devon4j-logging</artifactId>
3434
</dependency>
3535

36-
3736
<dependency>
3837
<groupId>org.springframework.kafka</groupId>
3938
<artifactId>spring-kafka-test</artifactId>

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/api/config/MessageReceiverConfig.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,18 @@ public KafkaListenerContainerProperties messageKafkaListenerContainerProperties(
6464
/**
6565
* Creates the bean for {@link MessageListenerLoggingAspect}.
6666
*
67+
* @param <K> the key type
68+
* @param <V> the value type
69+
*
6770
* @param messageSpanExtractor the {@link MessageSpanExtractor}
6871
*
6972
* @return the {@link MessageListenerLoggingAspect}.
7073
*/
7174
@Bean
72-
public MessageListenerLoggingAspect messageListenerLoggingAspect(MessageSpanExtractor messageSpanExtractor) {
75+
public <K, V> MessageListenerLoggingAspect<K, V> messageListenerLoggingAspect(
76+
MessageSpanExtractor<K, V> messageSpanExtractor) {
7377

74-
MessageListenerLoggingAspect messageListenerLoggingAspect = new MessageListenerLoggingAspect();
78+
MessageListenerLoggingAspect<K, V> messageListenerLoggingAspect = new MessageListenerLoggingAspect<>();
7579
messageListenerLoggingAspect.setTracer(this.tracer);
7680
messageListenerLoggingAspect.setSpanExtractor(messageSpanExtractor);
7781
return messageListenerLoggingAspect;

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/logging/impl/EventKey.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public enum EventKey {
88
/**
99
* MESSAGE_SENT_SUCCESSFULLY
1010
*/
11-
MESSAGE_SENT_SUCCESSFULLY("The message {} ​​was placed in the topic {}, partition {}, offset {}."),
11+
MESSAGE_SENT_SUCCESSFULLY("The message with ID {} ​​was placed in the topic {}, partition {}, offset {}."),
1212

1313
/**
1414
* MESSAGE_NOT_SENT

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/logging/impl/MessageListenerLoggingAspect.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@
3030
/**
3131
* This aspect class is used to listen the {@link KafkaListener} and to log the {@link ConsumerRecord} received.
3232
*
33+
* @param <K> the key type
34+
* @param <V> the value type
35+
*
3336
*/
3437
@Aspect
3538
@Order(0)
36-
public class MessageListenerLoggingAspect {
39+
public class MessageListenerLoggingAspect<K, V> {
3740

3841
private static final Logger LOG = LoggerFactory.getLogger(MessageListenerLoggingAspect.class);
3942

@@ -51,7 +54,7 @@ public class MessageListenerLoggingAspect {
5154
/**
5255
* The {@link MessageSpanExtractor}.
5356
*/
54-
protected MessageSpanExtractor spanExtractor;
57+
protected MessageSpanExtractor<K, V> spanExtractor;
5558

5659
/**
5760
* Set the {@link Tracer}.
@@ -68,7 +71,7 @@ public void setTracer(Tracer tracer) {
6871
*
6972
* @param spanExtractor .
7073
*/
71-
public void setSpanExtractor(MessageSpanExtractor spanExtractor) {
74+
public void setSpanExtractor(MessageSpanExtractor<K, V> spanExtractor) {
7275

7376
this.spanExtractor = spanExtractor;
7477
}
@@ -83,8 +86,7 @@ public void setSpanExtractor(MessageSpanExtractor spanExtractor) {
8386
* @throws Throwable the {@link Throwable}
8487
*/
8588
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener) && args(kafkaRecord,..)")
86-
public Object logMessageProcessing(ProceedingJoinPoint call, ConsumerRecord<Object, Object> kafkaRecord)
87-
throws Throwable {
89+
public Object logMessageProcessing(ProceedingJoinPoint call, ConsumerRecord<K, V> kafkaRecord) throws Throwable {
8890

8991
openSpan(kafkaRecord);
9092

@@ -116,7 +118,7 @@ public Object logMessageProcessing(ProceedingJoinPoint call, ConsumerRecord<Obje
116118
}
117119
}
118120

119-
private void openSpan(ConsumerRecord<Object, Object> kafkaRecord) {
121+
private void openSpan(ConsumerRecord<K, V> kafkaRecord) {
120122

121123
if (ObjectUtils.isEmpty(this.tracer)) {
122124
return;
@@ -131,7 +133,7 @@ private void openSpan(ConsumerRecord<Object, Object> kafkaRecord) {
131133
(span.context().parentId() != null ? toLowerHex(span.context().parentId()) : "null"));
132134
}
133135

134-
private long determineLengthOfStayInTopic(ConsumerRecord<Object, Object> kafkaRecord) {
136+
private long determineLengthOfStayInTopic(ConsumerRecord<K, V> kafkaRecord) {
135137

136138
if (kafkaRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
137139
return Instant.now().toEpochMilli() - kafkaRecord.timestamp();

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/logging/impl/MessageLoggingSupport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ public class MessageLoggingSupport {
1111
* This method is used to log event when message is sent successfully.
1212
*
1313
* @param logger the {@link Logger}
14-
* @param value the message value.
14+
* @param key the message key.
1515
* @param topic the topic of the message.
1616
* @param partition the partition
1717
* @param offset the offset
1818
*
1919
*/
20-
public void logMessageSent(Logger logger, String value, String topic, Integer partition, Long offset) {
20+
public void logMessageSent(Logger logger, String key, String topic, Integer partition, Long offset) {
2121

22-
logger.info(EventKey.MESSAGE_SENT_SUCCESSFULLY.getMessage(), value, topic, partition, offset);
22+
logger.info(EventKey.MESSAGE_SENT_SUCCESSFULLY.getMessage(), key, topic, partition, offset);
2323
}
2424

2525
/**

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/logging/impl/ProducerLoggingListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@ public ProducerLoggingListener(MessageLoggingSupport loggingSupport) {
3232
@Override
3333
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
3434

35+
String messageKey = "<no message key>";
36+
if (key != null) {
37+
key = key.toString();
38+
}
3539
if (recordMetadata != null) {
36-
this.loggingSupport.logMessageSent(LOG, value.toString(), recordMetadata.topic(), recordMetadata.partition(),
40+
this.loggingSupport.logMessageSent(LOG, messageKey, recordMetadata.topic(), recordMetadata.partition(),
3741
recordMetadata.offset());
3842

3943
} else {
40-
this.loggingSupport.logMessageSent(LOG, value.toString(), topic, partition, null);
44+
this.loggingSupport.logMessageSent(LOG, messageKey, topic, partition, null);
4145
}
4246
}
4347

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/retry/impl/DefaultRetryPolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public boolean canRetry(ConsumerRecord<K, V> consumerRecord, MessageRetryContext
7777
throw new IllegalArgumentException("The \"ex \" parameter cannot be null.");
7878
}
7979

80+
LOG.info("proceeding with retry for the message {} and due to {}", consumerRecord.value().toString(),
81+
ex.getMessage());
82+
8083
if (retryContext != null && retryContext.getRetryUntil() != null
8184
&& retryContext.getCurrentRetryCount() < this.retryCount) {
8285
return canRetry(retryContext, ex);

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/trace/api/config/TraceConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@ public MessageSpanInjector messageSpanInjector() {
2727

2828
/**
2929
* Creates bean for the {@link MessageSpanExtractor}
30+
*
31+
* @param <K> the key type
32+
* @param <V> the value type
3033
*
3134
* @return the {@link MessageSpanExtractor}
3235
*/
3336
@Bean
34-
public MessageSpanExtractor messageSpanExtractor() {
37+
public <K, V> MessageSpanExtractor<K, V> messageSpanExtractor() {
3538

36-
return new MessageSpanExtractor();
39+
return new MessageSpanExtractor<>();
3740
}
3841

3942
}

0 commit comments

Comments
 (0)