Skip to content

Commit 2c7c296

Browse files
OKafka Lab (#1132)
Signed-off-by: Anders Swanson <[email protected]>
1 parent e888ed9 commit 2c7c296

File tree

9 files changed

+261
-0
lines changed

9 files changed

+261
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
services:
2+
okafkadb:
3+
image: gvenzl/oracle-free:23.9-slim-faststart
4+
container_name: okafkadb
5+
ports:
6+
- 9092:1521
7+
environment:
8+
- ORACLE_PASSWORD=Welcome12345
9+
volumes:
10+
- ./oraclefree:/container-entrypoint-initdb.d
11+
healthcheck:
12+
test: ["CMD-SHELL", "lsnrctl status | grep READY"]
13+
interval: 15s
14+
timeout: 10s
15+
retries: 5
16+
start_period: 30s
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Set as appropriate for your database.
2+
alter session set container = freepdb1;
3+
4+
-- user for okafka
5+
create user TESTUSER identified by testpwd;
6+
grant create session to TESTUSER;
7+
grant unlimited tablespace to TESTUSER;
8+
grant connect, resource to TESTUSER;
9+
10+
-- okafka permissions
11+
grant execute on dbms_aq to TESTUSER;
12+
grant execute on dbms_aqadm to TESTUSER;
13+
grant select on gv_$session to TESTUSER;
14+
grant select on v_$session to TESTUSER;
15+
grant select on gv_$instance to TESTUSER;
16+
grant select on gv_$listener_network to TESTUSER;
17+
grant select on SYS.DBA_RSRC_PLAN_DIRECTIVES to TESTUSER;
18+
grant select on gv_$pdbs to TESTUSER;
19+
grant select on user_queue_partition_assignment_table to TESTUSER;
20+
exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER');
21+
commit;

code-teq/okafka-lab/pom.xml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<artifactId>oracle-database-kafka-apis</artifactId>
8+
<build>
9+
<plugins>
10+
<plugin>
11+
<groupId>org.apache.maven.plugins</groupId>
12+
<artifactId>maven-compiler-plugin</artifactId>
13+
<configuration>
14+
<source>9</source>
15+
<target>9</target>
16+
</configuration>
17+
</plugin>
18+
</plugins>
19+
</build>
20+
<groupId>com.example.okafka</groupId>
21+
<name>oracle-kafka-apis</name>
22+
<description>Oracle Kafka API Example</description>
23+
<version>1.0.0</version>
24+
25+
<properties>
26+
<java.version>21</java.version>
27+
</properties>
28+
29+
<dependencies>
30+
<!-- OKafka All-in-one -->
31+
<dependency>
32+
<groupId>com.oracle.database.messaging</groupId>
33+
<artifactId>okafka</artifactId>
34+
<version>23.6.1.0</version>
35+
</dependency>
36+
37+
</dependencies>
38+
39+
40+
</project>
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.example.okafka;
2+
3+
import org.apache.kafka.clients.admin.Admin;
4+
import org.apache.kafka.clients.admin.NewTopic;
5+
import org.apache.kafka.common.errors.TopicExistsException;
6+
import org.oracle.okafka.clients.admin.AdminClient;
7+
8+
import java.util.List;
9+
import java.util.Properties;
10+
import java.util.concurrent.ExecutionException;
11+
12+
import static com.example.okafka.OKafka.TOPIC_NAME;
13+
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
14+
15+
public class CreateTopic {
16+
public static void main(String[] args) {
17+
// Authentication properties to connect to Kafka
18+
Properties props = getAuthenticationProperties();
19+
20+
try (Admin admin = AdminClient.create(props)) {
21+
NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1);
22+
admin.createTopics(List.of(testTopic))
23+
.all()
24+
.get();
25+
System.out.println("[ADMIN] Created topic: " + testTopic.name());
26+
} catch (ExecutionException | InterruptedException e) {
27+
if (e.getCause() instanceof TopicExistsException) {
28+
System.out.println("[ADMIN] Topic already exists");
29+
} else {
30+
throw new RuntimeException(e);
31+
}
32+
}
33+
}
34+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.example.okafka;
2+
3+
public interface OKafka {
4+
String TOPIC_NAME = "test_topic";
5+
6+
static String getEnv(String key, String defaultValue) {
7+
String value = System.getenv(key);
8+
if (value == null || value.isEmpty()) {
9+
return defaultValue;
10+
}
11+
return value;
12+
}
13+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.example.okafka;
2+
3+
import java.util.Properties;
4+
5+
import static com.example.okafka.OKafka.getEnv;
6+
7+
public class OKafkaAuthentication {
8+
// For this example, we'll configure our authentication parameters with environment variables.
9+
// The security.protocol property supports PLAINTEXT (insecure) and SSL (secure) authentication.
10+
private static final String securityProtocol = getEnv("SECURITY_PROTOCOL", "PLAINTEXT");
11+
12+
// For PLAINTEXT authentication, provide the HOSTNAME:PORT as the bootstrap.servers property.
13+
private static final String bootstrapServers = getEnv("BOOTSTRAP_SERVERS", "localhost:9092");
14+
15+
// The TNS Admin alias / Oracle Database Service name.
16+
private static final String tnsAdmin = getEnv("TNS_ADMIN", "freepdb1");
17+
18+
// The directory containing the database wallet. For PLAINTEXT, this directory need only
19+
// contain an ojdbc.properties file with the "user" and "password" properties configured.
20+
private static final String walletDir = getEnv("WALLET_DIR", "./wallet");
21+
22+
/**
23+
* Create a Java Properties object for Oracle AI Database OKafka connection.
24+
* Configure using the SECURITY_PROTOCOL, BOOTSTRAP_SERVERS, TNS_ADMIN, and WALLET_DIR environment variables.
25+
* @return configured Properties object.
26+
*/
27+
public static Properties getAuthenticationProperties() {
28+
// Just like kafka-clients, we can use a Java Properties object to configure connection parameters.
29+
Properties props = new Properties();
30+
31+
// oracle.service.name is a custom property to configure the Database service name.
32+
props.put("oracle.service.name", tnsAdmin);
33+
// oracle.net.tns_admin is a custom property to configure the directory containing Oracle Database connection files.
34+
// If you are using mTLS authentication, client certificates must be present in this directory.
35+
props.put("oracle.net.tns_admin", walletDir);
36+
// security.protocol is a standard Kafka property, set to PLAINTEXT or SSL for Oracle Database.
37+
// (SASL is not supported with Oracle Database).
38+
props.put("security.protocol", securityProtocol);
39+
if (securityProtocol.equals("SSL")) {
40+
// For SSL authentication, pass the TNS alias (such as "mydb_tp") to be used from the tnsnames.ora file
41+
// found in the WALLET_DIR directory.
42+
props.put("tns.alias", tnsAdmin);
43+
} else {
44+
// For PLAINTEXT authentication, we provide the database URL in the format
45+
// HOSTNAME:PORT as the bootstrap.servers property.
46+
props.put("bootstrap.servers", bootstrapServers);
47+
}
48+
49+
return props;
50+
}
51+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.example.okafka;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.oracle.okafka.clients.consumer.KafkaConsumer;
7+
8+
import java.time.Duration;
9+
import java.util.List;
10+
import java.util.Properties;
11+
12+
import static com.example.okafka.OKafka.TOPIC_NAME;
13+
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
14+
15+
public class OKafkaConsumer {
16+
public static void main(String[] args) {
17+
Properties props = getAuthenticationProperties();
18+
19+
// Note the use of standard Kafka properties for OKafka configuration.
20+
props.put("group.id" , "TEST_CONSUMER");
21+
props.put("enable.auto.commit","false");
22+
props.put("max.poll.records", 50);
23+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24+
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
25+
props.put("auto.offset.reset", "earliest");
26+
27+
Consumer<String, String> consumer = new KafkaConsumer<>(props);
28+
29+
consumer.subscribe(List.of(TOPIC_NAME));
30+
System.out.println("Subscribed to topic " + TOPIC_NAME);
31+
while (true) {
32+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
33+
for (ConsumerRecord<String, String> record : records) {
34+
System.out.println("Consumed record: " + record.value());
35+
}
36+
}
37+
}
38+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.example.okafka;
2+
3+
import org.apache.kafka.clients.producer.Producer;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
import org.oracle.okafka.clients.producer.KafkaProducer;
6+
7+
import java.time.Instant;
8+
import java.util.Properties;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.ScheduledExecutorService;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import static com.example.okafka.OKafka.TOPIC_NAME;
14+
import static com.example.okafka.OKafkaAuthentication.getAuthenticationProperties;
15+
16+
public class OKafkaProducer {
17+
18+
public static void main(String[] args) throws InterruptedException {
19+
Properties props = getAuthenticationProperties();
20+
props.put("enable.idempotence", "true");
21+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
22+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
23+
Producer<String, String> producer = new KafkaProducer<>(props);
24+
Runnable producerThread = () -> {
25+
Instant now = Instant.now();
26+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Message: " + now);
27+
producer.send(record);
28+
System.out.println("Producer sent message: " + record.value());
29+
};
30+
31+
32+
33+
int pauseMillis = 1000;
34+
String pm = System.getenv("PAUSE_MILLIS");
35+
if (pm != null && !pm.isEmpty()) {
36+
pauseMillis = Integer.parseInt(pm);
37+
}
38+
39+
try (ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
40+
) {
41+
System.out.println("Starting producer");
42+
scheduler.scheduleAtFixedRate(producerThread, 0, pauseMillis, TimeUnit.MILLISECONDS);
43+
}
44+
Thread.currentThread().join();
45+
}
46+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
user = testuser
2+
password = testpwd

0 commit comments

Comments
 (0)