Skip to content

Commit 06f704b

Browse files
committed
Add a simple producer/consumer Kafka example
1 parent c807aaa commit 06f704b

File tree

3 files changed

+54
-0
lines changed

3 files changed

+54
-0
lines changed

src/main/resources/application.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ mp.messaging.incoming.articles-in.topic=articles
3333
mp.messaging.incoming.articles-in.value.deserializer=org.acme.kafka.ArticleDeserializer
3434
mp.messaging.outgoing.articles-out.connector=smallrye-kafka
3535
mp.messaging.outgoing.articles-out.topic=articles-processed
36+
# The following is the Kafka configuration for the Hello World Kafka Example
37+
quarkus.kafka.devservices.topic-partitions.hello-world=1
38+
mp.messaging.outgoing.hello-world-out.connector=smallrye-kafka
39+
mp.messaging.outgoing.hello-world-out.topic=hello-world
40+
mp.messaging.incoming.hello-world-in.connector=smallrye-kafka
41+
mp.messaging.incoming.hello-world-in.topic=hello-world
3642
### -------------------PERSISTENCE CONFIGURATION------------------------- ###
3743
# Datasource configuration
3844
quarkus.datasource.db-kind=postgresql
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.acme.kafka.helloworld
2+
3+
import io.quarkus.logging.Log
4+
5+
import io.smallrye.reactive.messaging.annotations.Blocking
6+
import jakarta.enterprise.context.ApplicationScoped
7+
import org.eclipse.microprofile.metrics.annotation.Counted
8+
import org.eclipse.microprofile.reactive.messaging.Incoming
9+
10+
// This is a simple Kafka consumer that reads messages from the "hello-world-out" channel and logs them.
11+
12+
@ApplicationScoped
13+
class HelloWorldConsumer:
14+
@Incoming("hello-world-in")
15+
@Blocking
16+
@Counted(name = "consumedHelloMessages", description = "How many hello messages were consumed")
17+
def consumeMessage(msg: String): Unit =
18+
Log.info(s"Received message: ${msg}")
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.acme.kafka.helloworld
2+
3+
import java.util.concurrent.TimeUnit
4+
5+
import io.quarkus.logging.Log
6+
import io.quarkus.scheduler.Scheduled
7+
8+
import jakarta.enterprise.context.ApplicationScoped
9+
import jakarta.transaction.Transactional
10+
import org.eclipse.microprofile.metrics.annotation.Counted
11+
import org.eclipse.microprofile.reactive.messaging.{Channel, Emitter}
12+
13+
// This is a simple Kafka producer that sends a message every second to the Kafka topic "hello-world" via the channel "hello-world-in" with a sequential ID and an identifier from the PRODUCER_ID environment variable.
14+
15+
@ApplicationScoped
16+
class HelloWorldProducer(
17+
@Channel("hello-world-out") emitter: Emitter[String]
18+
):
19+
// Store a sequential ID for the messages
20+
private var id = 0
21+
@Transactional
22+
@Scheduled(every = "1s", delay = 5, delayUnit = TimeUnit.SECONDS, identity = "hello-world-producer")
23+
@Counted(name = "producedHelloMessages", description = "How many hello messages were produced")
24+
def produce() =
25+
val producerId = scala.util.Properties.envOrElse("PRODUCER_ID", "unknown")
26+
val message = s"Hello World from $producerId - $id"
27+
Log.info("Producing a message: " + message)
28+
emitter.send(message)
29+
id += 1
30+
end HelloWorldProducer

0 commit comments

Comments
 (0)