Skip to content

Commit 8c2e02d

Browse files
authored
Add SSL support to data-generator (#70)
* Add slf4j to data-generator * Add SSL to data-generator * Refactor slf4j version to property
1 parent 6b1c3a0 commit 8c2e02d

File tree

6 files changed

+71
-30
lines changed

6 files changed

+71
-30
lines changed

tutorials/data-generator/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<properties>
1515
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1616
<maven.compiler.release>17</maven.compiler.release>
17+
<slf4j.version>2.0.17</slf4j.version>
1718
</properties>
1819

1920
<dependencies>
@@ -27,6 +28,16 @@
2728
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
2829
<version>${apicurio-registry.version}</version>
2930
</dependency>
31+
<dependency>
32+
<groupId>org.slf4j</groupId>
33+
<artifactId>slf4j-api</artifactId>
34+
<version>${slf4j.version}</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.slf4j</groupId>
38+
<artifactId>slf4j-simple</artifactId>
39+
<version>${slf4j.version}</version>
40+
</dependency>
3041
</dependencies>
3142

3243
<build>

tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,17 @@
1616
import java.util.function.Function;
1717
import java.util.function.Supplier;
1818

19-
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
20-
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
21-
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
2219
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
2320

2421
public class DataGenerator implements Runnable {
25-
final static Map<String, String> RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
26-
final static int KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
27-
final static String KAFKA_ADMIN_CLIENT_ID_CONFIG = "data-generator-admin-client";
28-
final String bootstrapServers;
22+
static final Map<String, String> RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
2923
final List<Data> dataTypes;
3024
final Properties kafkaAdminProps;
3125

32-
public DataGenerator(String bootstrapServers, List<Data> dataTypes) {
33-
this.bootstrapServers = bootstrapServers;
26+
public DataGenerator(List<Data> dataTypes) {
3427
this.dataTypes = dataTypes;
3528

36-
kafkaAdminProps = new Properties();
37-
kafkaAdminProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
38-
kafkaAdminProps.put(CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID_CONFIG);
39-
kafkaAdminProps.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
29+
kafkaAdminProps = KafkaAdminProps.get();
4030
}
4131

4232
@Override
@@ -45,10 +35,10 @@ public void run() {
4535

4636
if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) {
4737
String registryUrl = System.getenv("REGISTRY_URL");
48-
Producer<String, Object> producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl));
38+
Producer<String, Object> producer = new KafkaProducer<>(KafkaClientProps.avro(registryUrl));
4939
send(producer, () -> generateTopicRecords(this::generateAvroRecord));
5040
} else {
51-
Producer<String, String> producer = new KafkaProducer<>(KafkaClientProps.csv(bootstrapServers));
41+
Producer<String, String> producer = new KafkaProducer<>(KafkaClientProps.csv());
5242
send(producer, () -> generateTopicRecords(this::generateCsvRecord));
5343
}
5444
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.github.streamshub.kafka.data.generator;
2+
3+
import java.util.Properties;
4+
5+
public class KafkaAdminProps {
6+
static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-admin-client";
7+
8+
public static Properties get() {
9+
return KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
10+
}
11+
}

tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@
88
import java.util.Properties;
99

1010
public class KafkaClientProps {
11-
public static Properties csv(String bootstrapServers) {
12-
Properties props = init(bootstrapServers);
11+
static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-client";
12+
13+
public static Properties csv() {
14+
Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
1315
props.put("value.serializer",
1416
"org.apache.kafka.common.serialization.StringSerializer");
1517
return props;
1618
}
1719

18-
public static Properties avro(String bootstrapServers, String registryUrl) {
19-
Properties props = init(bootstrapServers);
20+
public static Properties avro(String registryUrl) {
21+
Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
2022
props.put("value.serializer", AvroKafkaSerializer.class.getName());
2123

2224
props.put(SerdeConfig.REGISTRY_URL, registryUrl);
@@ -28,13 +30,4 @@ public static Properties avro(String bootstrapServers, String registryUrl) {
2830

2931
return props;
3032
}
31-
32-
private static Properties init(String bootstrapServer) {
33-
Properties props = new Properties();
34-
props.put("bootstrap.servers", bootstrapServer);
35-
36-
props.put("key.serializer",
37-
"org.apache.kafka.common.serialization.StringSerializer");
38-
return props;
39-
}
4033
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.github.streamshub.kafka.data.generator;
2+
3+
import java.util.Properties;
4+
5+
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
6+
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
7+
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
8+
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
9+
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
10+
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
11+
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
12+
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
13+
14+
public class KafkaCommonProps {
15+
static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
16+
17+
public static Properties get(String clientId) {
18+
Properties props = new Properties();
19+
props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
20+
props.put(CLIENT_ID_CONFIG, clientId);
21+
props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
22+
23+
String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
24+
25+
if (securityProtocol != null && securityProtocol.equals("SSL")) {
26+
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
27+
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION"));
28+
props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE"));
29+
props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION"));
30+
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD"));
31+
}
32+
33+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
34+
35+
return props;
36+
}
37+
}

tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@
99

1010
public class Main {
1111
public static void main(String[] args) {
12-
String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
1312
List<Data> dataTypes = Arrays.stream(System.getenv("DATA").split(","))
1413
.map(Main::getDataClass)
1514
.toList();
16-
Thread dgThread = new Thread(new DataGenerator(bootstrapServers, dataTypes));
15+
Thread dgThread = new Thread(new DataGenerator(dataTypes));
1716
dgThread.start();
1817
}
1918

0 commit comments

Comments
 (0)