Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions code-teq/okafka-lab/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
services:
okafkadb:
okafka:
image: gvenzl/oracle-free:23.9-slim-faststart
container_name: okafkadb
container_name: okafka
ports:
- 9092:1521
environment:
Expand Down
10 changes: 8 additions & 2 deletions code-teq/okafka-lab/oraclefree/grant_permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ alter session set container = freepdb1;
-- user for okafka
create user TESTUSER identified by testpwd;
grant create session to TESTUSER;
grant unlimited tablespace to TESTUSER;
grant connect, resource to TESTUSER;
grant resource, connect, unlimited tablespace to TESTUSER;

-- okafka permissions
grant aq_user_role to TESTUSER;
grant execute on dbms_aq to TESTUSER;
grant execute on dbms_aqadm to TESTUSER;
grant select on gv_$session to TESTUSER;
Expand All @@ -19,3 +19,9 @@ grant select on gv_$pdbs to TESTUSER;
grant select on user_queue_partition_assignment_table to TESTUSER;
exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER');
commit;

create table testuser.log (
id number generated always as identity primary key,
produced timestamp,
consumed timestamp
);
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.ExecutionException;

import static com.example.okafka.OKafka.TOPIC_NAME;
import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;

public class CreateTopic {
Expand All @@ -19,7 +20,8 @@ public static void main(String[] args) {

try (Admin admin = AdminClient.create(props)) {
NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1);
admin.createTopics(List.of(testTopic))
NewTopic transactionalTestTopic = new NewTopic(TRANSACTIONAL_TOPIC_NAME, 1, (short) 1);
admin.createTopics(List.of(testTopic, transactionalTestTopic))
.all()
.get();
System.out.println("[ADMIN] Created topic: " + testTopic.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
public interface OKafka {
String TOPIC_NAME = "test_topic";

String TRANSACTIONAL_TOPIC_NAME = "test_transactional_topic";

static String getEnv(String key, String defaultValue) {
String value = System.getenv(key);
if (value == null || value.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static void main(String[] args) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumed record: " + record.value());
}

consumer.commitSync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.example.okafka.OKafka.TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
Expand All @@ -21,26 +18,20 @@ public static void main(String[] args) throws InterruptedException {
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Runnable producerThread = () -> {
Instant now = Instant.now();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now);
producer.send(record);
System.out.println("Producer sent message: " + record.value());
};



int pauseMillis = 1000;
String pm = System.getenv("PAUSE_MILLIS");
if (pm != null && !pm.isEmpty()) {
pauseMillis = Integer.parseInt(pm);
}

try (ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
) {
System.out.println("Starting producer");
scheduler.scheduleAtFixedRate(producerThread, 0, pauseMillis, TimeUnit.MILLISECONDS);
while (true) {
Instant now = Instant.now();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now);
producer.send(record);
System.out.println("Producer sent message: " + record.value());

Thread.sleep(pauseMillis);
}
Thread.currentThread().join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.example.okafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.oracle.okafka.clients.consumer.KafkaConsumer;

import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Properties;

import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;

public class TransactionalConsumer {
public static void main(String[] args) throws SQLException {
Properties props = getAuthenticationProperties();

// Note the use of standard Kafka properties for OKafka configuration.
props.put("group.id" , "TRANSACTIONAL_CONSUMER");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 50);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
props.put("auto.offset.reset", "earliest");

KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);

consumer.subscribe(List.of(TRANSACTIONAL_TOPIC_NAME));
System.out.println("Subscribed to topic " + TRANSACTIONAL_TOPIC_NAME);
while (true) {
Connection conn = consumer.getDBConnection();
String sql = "update log set consumed = ? where id = ?";
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(3));
for (ConsumerRecord<String, Long> record : records) {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setDate(1, new Date(Instant.now().toEpochMilli()));
ps.setLong(2, record.value());
ps.executeUpdate();
}
System.out.println("Consumed record: " + record.value());
}

consumer.commitSync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.example.okafka;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.oracle.okafka.clients.producer.KafkaProducer;

import java.sql.*;
import java.time.Instant;
import java.util.Properties;

import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME;
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;

public class TransactionalProducer {
public static void main(String[] args) throws InterruptedException, SQLException {
Properties props = getAuthenticationProperties();
props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer");
// This property is required for transactional producers
props.put("oracle.transactional.producer", "true");
KafkaProducer<String, Long> producer = new KafkaProducer<>(props);
producer.initTransactions();

int pauseMillis = 1000;
String pm = System.getenv("PAUSE_MILLIS");
if (pm != null && !pm.isEmpty()) {
pauseMillis = Integer.parseInt(pm);
}

while (true) {
Instant now = Instant.now();
long id;
producer.beginTransaction();
Connection conn = producer.getDBConnection();

final String sql = "insert into log (produced) values (?)";
try (PreparedStatement ps = conn.prepareStatement(sql, new String[]{"id",})) {
ps.setDate(1, new Date(now.toEpochMilli()));
ps.executeUpdate();

ResultSet generatedKeys = ps.getGeneratedKeys();
if (generatedKeys.next()) {
id = generatedKeys.getLong(1);
} else {
throw new SQLException("Create log message failed, no ID obtained");
}
}

ProducerRecord<String, Long> record = new ProducerRecord<>(TRANSACTIONAL_TOPIC_NAME, id);
producer.send(record);

producer.commitTransaction();
System.out.println("Producer sent message: " + record.value());

Thread.sleep(pauseMillis);
}
}
}