Skip to content

Commit f9d8abe

Browse files
Updates for OKafka Lab (#1133)
Signed-off-by: Anders Swanson <[email protected]>
1 parent 2c7c296 commit f9d8abe

File tree

8 files changed

+133
-21
lines changed

8 files changed

+133
-21
lines changed

code-teq/okafka-lab/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
services:
2-
okafkadb:
2+
okafka:
33
image: gvenzl/oracle-free:23.9-slim-faststart
4-
container_name: okafkadb
4+
container_name: okafka
55
ports:
66
- 9092:1521
77
environment:

code-teq/okafka-lab/oraclefree/grant_permissions.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ alter session set container = freepdb1;
44
-- user for okafka
55
create user TESTUSER identified by testpwd;
66
grant create session to TESTUSER;
7-
grant unlimited tablespace to TESTUSER;
8-
grant connect, resource to TESTUSER;
7+
grant resource, connect, unlimited tablespace to TESTUSER;
98

109
-- okafka permissions
10+
grant aq_user_role to TESTUSER;
1111
grant execute on dbms_aq to TESTUSER;
1212
grant execute on dbms_aqadm to TESTUSER;
1313
grant select on gv_$session to TESTUSER;
@@ -19,3 +19,9 @@ grant select on gv_$pdbs to TESTUSER;
1919
grant select on user_queue_partition_assignment_table to TESTUSER;
2020
exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER');
2121
commit;
22+
23+
create table testuser.log (
24+
id number generated always as identity primary key,
25+
produced timestamp,
26+
consumed timestamp
27+
);

code-teq/okafka-lab/src/main/java/com/example/okafka/CreateTopic.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.ExecutionException;
1111

1212
import static com.example.okafka.OKafka.TOPIC_NAME;
13+
import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME;
1314
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
1415

1516
public class CreateTopic {
@@ -19,7 +20,8 @@ public static void main(String[] args) {
1920

2021
try (Admin admin = AdminClient.create(props)) {
2122
NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1);
22-
admin.createTopics(List.of(testTopic))
23+
NewTopic transactionalTestTopic = new NewTopic(TRANSACTIONAL_TOPIC_NAME, 1, (short) 1);
24+
admin.createTopics(List.of(testTopic, transactionalTestTopic))
2325
.all()
2426
.get();
2527
System.out.println("[ADMIN] Created topic: " + testTopic.name());

code-teq/okafka-lab/src/main/java/com/example/okafka/OKafka.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
public interface OKafka {
44
String TOPIC_NAME = "test_topic";
55

6+
String TRANSACTIONAL_TOPIC_NAME = "test_transactional_topic";
7+
68
static String getEnv(String key, String defaultValue) {
79
String value = System.getenv(key);
810
if (value == null || value.isEmpty()) {

code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaConsumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public static void main(String[] args) {
3333
for (ConsumerRecord<String, String> record : records) {
3434
System.out.println("Consumed record: " + record.value());
3535
}
36+
37+
consumer.commitSync();
3638
}
3739
}
3840
}

code-teq/okafka-lab/src/main/java/com/example/okafka/OKafkaProducer.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
import java.time.Instant;
88
import java.util.Properties;
9-
import java.util.concurrent.Executors;
10-
import java.util.concurrent.ScheduledExecutorService;
11-
import java.util.concurrent.TimeUnit;
129

1310
import static com.example.okafka.OKafka.TOPIC_NAME;
1411
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
@@ -21,26 +18,20 @@ public static void main(String[] args) throws InterruptedException {
2118
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
2219
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
2320
Producer<String, String> producer = new KafkaProducer<>(props);
24-
Runnable producerThread = () -> {
25-
Instant now = Instant.now();
26-
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now);
27-
producer.send(record);
28-
System.out.println("Producer sent message: " + record.value());
29-
};
30-
31-
3221

3322
int pauseMillis = 1000;
3423
String pm = System.getenv("PAUSE_MILLIS");
3524
if (pm != null && !pm.isEmpty()) {
3625
pauseMillis = Integer.parseInt(pm);
3726
}
3827

39-
try (ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
40-
) {
41-
System.out.println("Starting producer");
42-
scheduler.scheduleAtFixedRate(producerThread, 0, pauseMillis, TimeUnit.MILLISECONDS);
28+
while (true) {
29+
Instant now = Instant.now();
30+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now);
31+
producer.send(record);
32+
System.out.println("Producer sent message: " + record.value());
33+
34+
Thread.sleep(pauseMillis);
4335
}
44-
Thread.currentThread().join();
4536
}
4637
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.example.okafka;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.apache.kafka.clients.consumer.ConsumerRecords;
5+
import org.oracle.okafka.clients.consumer.KafkaConsumer;
6+
7+
import java.sql.Connection;
8+
import java.sql.Date;
9+
import java.sql.PreparedStatement;
10+
import java.sql.SQLException;
11+
import java.time.Duration;
12+
import java.time.Instant;
13+
import java.util.List;
14+
import java.util.Properties;
15+
16+
import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME;
17+
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
18+
19+
public class TransactionalConsumer {
20+
public static void main(String[] args) throws SQLException {
21+
Properties props = getAuthenticationProperties();
22+
23+
// Note the use of standard Kafka properties for OKafka configuration.
24+
props.put("group.id" , "TRANSACTIONAL_CONSUMER");
25+
props.put("enable.auto.commit","false");
26+
props.put("max.poll.records", 50);
27+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
28+
props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
29+
props.put("auto.offset.reset", "earliest");
30+
31+
KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);
32+
33+
consumer.subscribe(List.of(TRANSACTIONAL_TOPIC_NAME));
34+
System.out.println("Subscribed to topic " + TRANSACTIONAL_TOPIC_NAME);
35+
while (true) {
36+
Connection conn = consumer.getDBConnection();
37+
String sql = "update log set consumed = ? where id = ?";
38+
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(3));
39+
for (ConsumerRecord<String, Long> record : records) {
40+
try (PreparedStatement ps = conn.prepareStatement(sql)) {
41+
ps.setDate(1, new Date(Instant.now().toEpochMilli()));
42+
ps.setLong(2, record.value());
43+
ps.executeUpdate();
44+
}
45+
System.out.println("Consumed record: " + record.value());
46+
}
47+
48+
consumer.commitSync();
49+
}
50+
}
51+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.example.okafka;
2+
3+
import org.apache.kafka.clients.producer.ProducerRecord;
4+
import org.oracle.okafka.clients.producer.KafkaProducer;
5+
6+
import java.sql.*;
7+
import java.time.Instant;
8+
import java.util.Properties;
9+
10+
import static com.example.okafka.OKafka.TRANSACTIONAL_TOPIC_NAME;
11+
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
12+
13+
public class TransactionalProducer {
14+
public static void main(String[] args) throws InterruptedException, SQLException {
15+
Properties props = getAuthenticationProperties();
16+
props.put("enable.idempotence", "true");
17+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
18+
props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer");
19+
// This property is required for transactional producers
20+
props.put("oracle.transactional.producer", "true");
21+
KafkaProducer<String, Long> producer = new KafkaProducer<>(props);
22+
producer.initTransactions();
23+
24+
int pauseMillis = 1000;
25+
String pm = System.getenv("PAUSE_MILLIS");
26+
if (pm != null && !pm.isEmpty()) {
27+
pauseMillis = Integer.parseInt(pm);
28+
}
29+
30+
while (true) {
31+
Instant now = Instant.now();
32+
long id;
33+
producer.beginTransaction();
34+
Connection conn = producer.getDBConnection();
35+
36+
final String sql = "insert into log (produced) values (?)";
37+
try (PreparedStatement ps = conn.prepareStatement(sql, new String[]{"id",})) {
38+
ps.setDate(1, new Date(now.toEpochMilli()));
39+
ps.executeUpdate();
40+
41+
ResultSet generatedKeys = ps.getGeneratedKeys();
42+
if (generatedKeys.next()) {
43+
id = generatedKeys.getLong(1);
44+
} else {
45+
throw new SQLException("Create log message failed, no ID obtained");
46+
}
47+
}
48+
49+
ProducerRecord<String, Long> record = new ProducerRecord<>(TRANSACTIONAL_TOPIC_NAME, id);
50+
producer.send(record);
51+
52+
producer.commitTransaction();
53+
System.out.println("Producer sent message: " + record.value());
54+
55+
Thread.sleep(pauseMillis);
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)