Skip to content

Commit 43d2870

Browse files
BAEL-9070 Implement SASL PLAIN Authentication in Kafka (#18639)
* opentelemtry for spring boot 3 * refactoring * fixed lint issues * refactoring * remove all deprecated code related to spring cloud sleuth * include version property * include version property * move spring-kafka article code to this module * remove code related to SASL article * sasl plaintext implemented * refactor the sasl config and test * removed unrelated code * removed unrelated code * removed unrelated code * implement SASL Plain * refactor the configs * remove the lombok dependency and spring test * refactoring error fixes * fix indentation in jaas configs * fix method typo --------- Co-authored-by: Liam Williams <[email protected]>
1 parent 20038c0 commit 43d2870

20 files changed

+370
-1
lines changed

spring-kafka-4/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
<groupId>org.springframework.boot</groupId>
3232
<artifactId>spring-boot-starter-actuator</artifactId>
3333
</dependency>
34-
3534
<dependency>
3635
<groupId>org.apache.avro</groupId>
3736
<artifactId>avro</artifactId>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.baeldung.sasl;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.kafka.annotation.KafkaListener;
7+
import org.springframework.stereotype.Component;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
@Component
13+
public class KafkaConsumer {
14+
15+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
16+
public static final String TOPIC = "test-topic";
17+
public final List<String> messages = new ArrayList<>();
18+
19+
@KafkaListener(topics = TOPIC)
20+
public void receive(ConsumerRecord<String, String> consumerRecord) {
21+
LOGGER.info("Received payload: '{}'", consumerRecord.toString());
22+
messages.add(consumerRecord.value());
23+
}
24+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.baeldung.sasl;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class KafkaSaslApplication {
8+
9+
public static void main(String[] args) {
10+
System.setProperty("spring.config.name", "application-sasl");
11+
SpringApplication.run(KafkaSaslApplication.class, args);
12+
}
13+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.baeldung.saslplaintext;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.kafka.annotation.KafkaListener;
7+
import org.springframework.stereotype.Component;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
@Component
13+
public class KafkaConsumer {
14+
15+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
16+
public static final String TOPIC = "test-topic";
17+
public final List<String> messages = new ArrayList<>();
18+
19+
@KafkaListener(topics = TOPIC)
20+
public void receive(ConsumerRecord<String, String> consumerRecord) {
21+
LOGGER.info("Received payload: '{}'", consumerRecord.toString());
22+
messages.add(consumerRecord.value());
23+
}
24+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.baeldung.saslplaintext;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.kafka.core.KafkaTemplate;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
public class KafkaProducer {
10+
11+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
12+
private final KafkaTemplate<String, String> kafkaTemplate;
13+
14+
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
15+
this.kafkaTemplate = kafkaTemplate;
16+
}
17+
18+
public void sendMessage(String message, String topic) {
19+
LOGGER.info("Producing message: {}", message);
20+
kafkaTemplate.send(topic, "key", message)
21+
.whenComplete((result, ex) -> {
22+
if (ex == null) {
23+
LOGGER.info("Message sent to topic: {}", message);
24+
} else {
25+
LOGGER.error("Failed to send message", ex);
26+
}
27+
});
28+
}
29+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.baeldung.saslplaintext;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class KafkaSaslPlaintextApplication {
8+
9+
public static void main(String[] args) {
10+
System.setProperty("spring.config.name", "application-sasl-plaintext");
11+
SpringApplication.run(KafkaSaslPlaintextApplication.class, args);
12+
}
13+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
spring:
2+
kafka:
3+
bootstrap-servers: localhost:9092
4+
properties:
5+
sasl.mechanism: PLAIN
6+
sasl.jaas.config: >
7+
org.apache.kafka.common.security.plain.PlainLoginModule required
8+
username="user1"
9+
password="user1-secret";
10+
security:
11+
protocol: SASL_PLAINTEXT
12+
consumer:
13+
group-id: test-group
14+
auto-offset-reset: earliest
15+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
16+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
spring:
2+
kafka:
3+
bootstrap-servers: localhost:9092
4+
properties:
5+
sasl.mechanism: GSSAPI
6+
sasl.jaas.config: >
7+
com.sun.security.auth.module.Krb5LoginModule required
8+
useKeyTab=true
9+
storeKey=true
10+
keyTab="./src/test/resources/sasl/keytabs/client.keytab"
11+
principal="[email protected]"
12+
serviceName="kafka";
13+
security:
14+
protocol: "SASL_PLAINTEXT"
15+
consumer:
16+
group-id: test
17+
auto-offset-reset: earliest
18+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
19+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.baeldung.sasl;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.junit.jupiter.api.extension.ExtendWith;
5+
import org.springframework.boot.test.context.SpringBootTest;
6+
import org.springframework.test.context.junit.jupiter.SpringExtension;
7+
8+
@ExtendWith(SpringExtension.class)
9+
@SpringBootTest(classes = KafkaSaslApplication.class)
10+
class SpringContextTest {
11+
12+
@Test
13+
void whenSpringContextIsBootstrapped_thenNoExceptions() {
14+
}
15+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.baeldung.saslplaintext;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.boot.test.context.SpringBootTest;
6+
import org.springframework.test.context.ActiveProfiles;
7+
import org.testcontainers.containers.DockerComposeContainer;
8+
import org.testcontainers.containers.wait.strategy.Wait;
9+
import org.testcontainers.junit.jupiter.Container;
10+
import org.testcontainers.junit.jupiter.Testcontainers;
11+
12+
import java.io.File;
13+
import java.time.Duration;
14+
import java.util.UUID;
15+
16+
import static com.baeldung.saslplaintext.KafkaConsumer.TOPIC;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.awaitility.Awaitility.await;
19+
20+
@Testcontainers
21+
@ActiveProfiles("sasl-plaintext")
22+
@SpringBootTest(classes = KafkaSaslPlaintextApplication.class)
23+
class KafkaSaslPlaintextApplicationLiveTest {
24+
25+
private static final File KAFKA_COMPOSE_FILE = new File("src/test/resources/sasl-plaintext/docker-compose.yml");
26+
private static final String KAFKA_SERVICE = "kafka";
27+
private static final int SASL_PORT = 9092;
28+
29+
@Container
30+
public DockerComposeContainer<?> container =
31+
new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
32+
.withExposedService(KAFKA_SERVICE, SASL_PORT, Wait.forListeningPort());
33+
34+
@Autowired
35+
private KafkaProducer kafkaProducer;
36+
37+
@Autowired
38+
private KafkaConsumer kafkaConsumer;
39+
40+
@Test
41+
void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() {
42+
String message = UUID.randomUUID().toString();
43+
kafkaProducer.sendMessage(message, TOPIC);
44+
45+
await().atMost(Duration.ofMinutes(2))
46+
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
47+
}
48+
}

0 commit comments

Comments
 (0)