diff --git a/code-examples/kafka-java-maven/.gitignore b/code-examples/kafka-java-maven/.gitignore
new file mode 100644
index 000000000..e673575a7
--- /dev/null
+++ b/code-examples/kafka-java-maven/.gitignore
@@ -0,0 +1,2 @@
+.idea/
+target/
\ No newline at end of file
diff --git a/code-examples/kafka-java-maven/pom.xml b/code-examples/kafka-java-maven/pom.xml
new file mode 100644
index 000000000..7e738fe19
--- /dev/null
+++ b/code-examples/kafka-java-maven/pom.xml
@@ -0,0 +1,65 @@
+
+
+ 4.0.0
+
+ org.example
+ kafka-java-maven
+ 1.0-SNAPSHOT
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.2.1
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-xml
+ 2.8.5
+
+
+
+
+
+ producer
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.0.0
+
+ org.example.ProducerExample
+
+
+
+
+
+
+ consumer
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.0.0
+
+ org.example.ConsumerExample
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java b/code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java
new file mode 100644
index 000000000..89cd0df0e
--- /dev/null
+++ b/code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java
@@ -0,0 +1,47 @@
+package org.example;
+
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import java.time.Duration;
+
+public class ConsumerExample {
+
+ public static void main(String[] args) {
+
+ var properties= KafkaConfig.properties();
+ int MAX_MESSAGES_CONSUMED = 1;
+ int messagesCount = 0;
+
+ if(args.length > 0) {
+ MAX_MESSAGES_CONSUMED = Integer.parseInt(args[0]);
+ }
+
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test_group_2");
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KafkaConsumer consumer = new KafkaConsumer(properties);
+
+ //Subscribing
+ consumer.subscribe(Arrays.asList("prices"));
+
+ //polling
+ while(true){
+ if(messagesCount >= MAX_MESSAGES_CONSUMED) {
+ break;
+ }
+ ConsumerRecords records=consumer.poll(Duration.ofMillis(100));
+ for(ConsumerRecord record: records) {
+ System.out.println("Key: "+ record.key() + ", Value:" +record.value());
+ System.out.println("Partition:" + record.partition()+",Offset:"+record.offset());
+ messagesCount ++;
+ }
+ }
+ }
+}
diff --git a/code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java b/code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java
new file mode 100644
index 000000000..fc261e4e5
--- /dev/null
+++ b/code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java
@@ -0,0 +1,30 @@
+package org.example;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+
+public class KafkaConfig {
+
+ static Properties properties() {
+
+ String kafkaHost = System.getenv("KAFKA_HOST");
+ String rhoasClientID = System.getenv("RHOAS_SERVICE_ACCOUNT_CLIENT_ID");
+ String rhoasClientSecret = System.getenv("RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET");
+ String rhoasOauthTokenUrl = System.getenv("RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL");
+
+ var properties= new Properties();
+
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
+ properties.setProperty("security.protocol", "SASL_SSL");
+ properties.setProperty("sasl.mechanism", "OAUTHBEARER");
+
+ properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"" + rhoasClientID + "\" clientSecret=\"" + rhoasClientSecret + "\" oauth.token.endpoint.uri=\"" + rhoasOauthTokenUrl + "\";");
+
+ properties.setProperty("sasl.login.callback.handler.class", "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
+ properties.setProperty("sasl.oauthbearer.token.endpoint.url", rhoasOauthTokenUrl);
+ properties.setProperty("sasl.oauthbearer.scope.claim.name", "api.iam.service_accounts");
+
+ return properties;
+ }
+}
diff --git a/code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java b/code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java
new file mode 100644
index 000000000..013a0da87
--- /dev/null
+++ b/code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java
@@ -0,0 +1,23 @@
+package org.example;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class ProducerExample {
+
+ public static void main(String[] args) {
+
+ //Creating producer properties
+ var properties= KafkaConfig.properties();
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ KafkaProducer producer= new KafkaProducer(properties);
+
+ producer.send(new ProducerRecord<>("prices", "Test Message"));
+ producer.flush();
+ producer.close();
+ }
+}
\ No newline at end of file