Skip to content

Commit 968b7cf

Browse files
Spring into CC with Kotlin Volume 2 - Kafka Streams(#3) (#279)
* Correcting command on terraform output to env file. * new protobuf object definitions * initial commit of cloud-stream-kafka * new topics for kafka streams example * abandoning protobuf for avro. * simple kafka streams topology, filter platinum. * Kafka-streams example. * Update topology for only platinum and gold members. * documentation and word count topics in terraform. * Updates to README from code review --------- Co-authored-by: Dave Troiano <[email protected]>
1 parent 5ec6fb8 commit 968b7cf

File tree

27 files changed

+1025
-15
lines changed

27 files changed

+1025
-15
lines changed

spring-into-kafka-cc/README.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ You can validate your Confluent Cloud assets are provisioned via the Confluent C
3838
The code in this demo relies on the assets you just provisioned, and the configuration of this Spring Boot application needs the credentials and endpoints we just provisioned. Let's export those to a properties file in the `USER_HOME` directory to later usage:
3939

4040
```shell
41-
terraform output -json | jq -r 'to_entries[] | .key + "=" + (.value.value | tostring)' | while read -r line ; do echo "$line"; done > . ~/tools/spring-into-cc.properties
41+
terraform output -json | jq -r 'to_entries[] | .key + "=" + (.value.value | tostring)' | while read -r line ; do echo "$line"; done > ~/tools/spring-into-cc.properties
4242

4343
cat ~/tools/spring-into-cc.properties
4444

spring-into-kafka-cc/build.gradle.kts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ plugins {
66
kotlin("plugin.spring") version "1.9.24" apply false
77
id("com.bakdata.avro") version "1.0.0" apply false
88
kotlin("plugin.serialization") version "2.0.0" apply false
9+
id("com.google.protobuf") version "0.9.4" apply false
10+
11+
}
12+
13+
allprojects {
14+
group = "io.confluent.devrel"
15+
version = "0.0.2-SNAPSHOT"
916
}
1017

11-
group = "io.confluent.devrel"
12-
version = "0.0.1-SNAPSHOT"

spring-into-kafka-cc/common/build.gradle.kts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ sourceSets {
2222
}
2323
}
2424

25-
val fakerVersion = "2.0.0-rc.3"
25+
val fakerVersion = providers.gradleProperty("kotlin_faker_version").get()
26+
val kafkaVersion = providers.gradleProperty("kafka_version").get()
27+
val confluentVersion = providers.gradleProperty("confluent_version").get()
2628

2729
dependencies {
28-
// implementation("org.slf4j:slf4j-api:2.0.11")
29-
// implementation("org.slf4j:slf4j-simple:2.0.11")
30-
// implementation("ch.qos.logback:logback-core:1.4.14")
31-
3230
implementation(platform("io.github.serpro69:kotlin-faker-bom:$fakerVersion"))
3331
implementation("io.github.serpro69:kotlin-faker")
3432
implementation("io.github.serpro69:kotlin-faker-books")
@@ -37,9 +35,10 @@ dependencies {
3735
implementation("org.jetbrains.kotlinx:kotlinx-cli:0.3.6")
3836
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.0-RC.2")
3937

40-
implementation("org.apache.kafka:kafka-clients:3.7.0")
41-
implementation("io.confluent:kafka-avro-serializer:7.6.0")
42-
implementation("io.confluent:kafka-schema-rules:7.6.0")
38+
implementation("org.apache.kafka:kafka-clients:$kafkaVersion")
39+
implementation("io.confluent:kafka-avro-serializer:$confluentVersion")
40+
41+
implementation("io.confluent:kafka-schema-rules:$confluentVersion")
4342

4443
testImplementation("org.jetbrains.kotlin:kotlin-test")
4544
}
@@ -53,3 +52,4 @@ kotlin {
5352
tasks.withType<Test> {
5453
useJUnitPlatform()
5554
}
55+
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"type": "record",
3+
"namespace": "io.confluent.devrel.spring.model.club",
4+
"name": "Checkin",
5+
"fields": [
6+
{
7+
"name": "txnId",
8+
"type": "string",
9+
"doc": "transaction id"
10+
},
11+
{
12+
"name": "memberId",
13+
"type": "string"
14+
},
15+
{
16+
"name": "txnTimestamp",
17+
"type": {
18+
"type": "long",
19+
"logicalType": "timestamp-millis"
20+
}
21+
}
22+
]
23+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"type": "record",
3+
"namespace": "io.confluent.devrel.spring.model.club",
4+
"name": "EnrichedCheckin",
5+
"fields": [
6+
{
7+
"name": "checkinTxnId",
8+
"type": [
9+
"null",
10+
"string"
11+
],
12+
"doc": "transaction id",
13+
"default": null
14+
},
15+
{
16+
"name": "memberId",
17+
"type": "string"
18+
},
19+
{
20+
"name": "txnTimestamp",
21+
"type": [
22+
"null",
23+
{
24+
"type": "long",
25+
"logicalType": "timestamp-millis"
26+
}
27+
],
28+
"default": null
29+
},
30+
{
31+
"name": "membershipLevel",
32+
"type": "io.confluent.devrel.spring.model.club.MembershipLevel"
33+
}
34+
]
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"type": "record",
3+
"namespace": "io.confluent.devrel.spring.model.club",
4+
"name": "Member",
5+
"fields": [
6+
{
7+
"name": "id",
8+
"type": "string"
9+
},
10+
{
11+
"name": "firstName",
12+
"type": "string"
13+
},
14+
{
15+
"name": "lastName",
16+
"type": "string"
17+
},
18+
{
19+
"name": "email",
20+
"type": "string"
21+
},
22+
{
23+
"name": "membershipLevel",
24+
"type": "io.confluent.devrel.spring.model.club.MembershipLevel",
25+
"default": "STANDARD"
26+
},
27+
{
28+
"name": "joinDate",
29+
"type": {
30+
"type": "int",
31+
"logicalType": "date"
32+
}
33+
}
34+
]
35+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"type": "enum",
3+
"namespace": "io.confluent.devrel.spring.model.club",
4+
"name": "MembershipLevel",
5+
"symbols": [
6+
"STANDARD", "SILVER", "GOLD", "PLATINUM"
7+
]
8+
}

spring-into-kafka-cc/common/src/main/kotlin/io/confluent/devrel/spring/kfaker/BaseKFaker.kt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@ package io.confluent.devrel.spring.kfaker
22

33
import io.confluent.devrel.spring.model.Address
44
import io.confluent.devrel.spring.model.Customer
5+
import io.confluent.devrel.spring.model.club.Checkin
6+
import io.confluent.devrel.spring.model.club.Member
7+
import io.confluent.devrel.spring.model.club.MembershipLevel
58
import io.github.serpro69.kfaker.faker
9+
import java.time.Clock
10+
import java.time.ZoneId
611
import java.time.format.DateTimeFormatter
12+
import java.time.temporal.ChronoUnit
713

814
class BaseKFaker {
915

@@ -43,4 +49,36 @@ class BaseKFaker {
4349
.setDob(dateTimeFormatter.format(kFaker.person.birthDate(age)))
4450
.build()
4551
}
52+
53+
fun fakeMemberId(): String = kFaker.random.nextUUID().toString()
54+
55+
fun member(memberId: String, level: MembershipLevel = MembershipLevel.STANDARD): Member {
56+
57+
val firstNameFaker = kFaker.name.firstName()
58+
val lastNameFaker = kFaker.name.lastName()
59+
val email = kFaker.internet.email("${firstNameFaker}.${lastNameFaker}")
60+
61+
val daysAgo = java.time.LocalDate.now().minusDays((100..10000).random().toLong())
62+
63+
return Member.newBuilder()
64+
.setId(memberId)
65+
.setFirstName(firstNameFaker)
66+
.setLastName(lastNameFaker)
67+
.setEmail(email)
68+
.setMembershipLevel(level)
69+
.setJoinDate(daysAgo)
70+
.build()
71+
}
72+
73+
fun checkin(memberId: String): Checkin {
74+
75+
val checkinTime = Clock.system(ZoneId.systemDefault()).instant()
76+
.minus((1..15).random().toLong(), ChronoUnit.MINUTES)
77+
78+
return Checkin.newBuilder()
79+
.setTxnId(kFaker.random.nextUUID())
80+
.setMemberId(memberId)
81+
.setTxnTimestamp(checkinTime)
82+
.build()
83+
}
4684
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
confluent_version=7.7.0
2+
kafka_version=3.7.0
3+
kotlin_faker_version=2.0.0-rc.3
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
= Kafka Streams
2+
3+
This Spring Boot Application consists of a couple of examples using Kafka Streams with Kotlin and the Spring Framework. This example is rather "opinionated" - in that it does not delve into libraries like Spring Cloud Stream. Rather this example attempts to highlight the usage of dependency injection and configuring a Kafka Streams topology with Spring, while also providing syntactic examples from the Kotlin language.
4+
5+
== Example 1: Yes, Word-Count
6+
7+
This is a very familiar use case to anyone who has explored stream processing. Given an incoming event, the string value gets whitespace-tokenized into an array of strings. Then those strings are grouped such that case-insensitively identical strings are together, keyed by the word itself.
8+
We then materialize the counts of the words to a state store and emit those events with the counts to an output topic.
9+
10+
```kotlin
11+
@Autowired
12+
fun buildPipeline(streamsBuilder: StreamsBuilder) {
13+
14+
val messageStream = streamsBuilder
15+
.stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, STRING_SERDE))
16+
.peek {_, value -> logger.debug("*** raw value {}", value)}
17+
18+
val wordCounts = messageStream
19+
.mapValues { v -> v.lowercase() }
20+
.peek {_, value -> logger.info("*** lowercase value = {}", value)}
21+
.flatMapValues { v -> v.split("\\W+".toRegex()) }
22+
.groupBy({ _, word -> word }, Grouped.with(Serdes.String(), Serdes.String()))
23+
.count(Materialized.`as`<String, Long>(Stores.persistentKeyValueStore(COUNTS_STORE))
24+
.withKeySerde(STRING_SERDE)
25+
.withValueSerde(LONG_SERDE))
26+
27+
wordCounts.toStream().to(OUTPUT_TOPIC, Produced.with(STRING_SERDE, LONG_SERDE))
28+
}
29+
```
30+
31+
== Example 2: Stream-Table Join
32+
33+
This topology will filter the members, keeping only the ones of `PLATINUM` or `GOLD` level. Then it attempts a `join` on the
34+
checkins stream. The resulting matches are emitted to an output topic.
35+
36+
```kotlin
37+
@Autowired
38+
fun buildPipeline(streamsBuilder: StreamsBuilder) {
39+
40+
val checkins = streamsBuilder.stream(CHECKIN_TOPIC, Consumed.with(Serdes.String(), checkinSerde))
41+
.peek { _, checkin -> logger.debug("checkin -> {}", checkin) }
42+
43+
val members = streamsBuilder.table(MEMBER_TOPIC, Consumed.with(Serdes.String(), memberSerde))
44+
.filter { _, m -> listOf(PLATINUM, GOLD).contains(m.membershipLevel) }
45+
46+
val joined = checkins.join(members, { checkin, member ->
47+
logger.debug("matched member {} to checkin {}", member, checkin.txnId)
48+
EnrichedCheckin.newBuilder()
49+
.setMemberId(member.id)
50+
.setCheckinTxnId(checkin.txnId)
51+
.setTxnTimestamp(checkin.txnTimestamp)
52+
.setMembershipLevel(member.membershipLevel)
53+
.build()
54+
}, Joined.with(Serdes.String(), checkinSerde, memberSerde))
55+
56+
joined.to(ENRICHED_CHECKIN_TOPIC, Produced.with(Serdes.String(), enrichedCheckinSerde))
57+
}
58+
```
59+
60+
61+
== Run It
62+
63+
=== Unit Tests
64+
65+
To execute the unit tests, run the following gradle command (from the root of the project):
66+
67+
```bash
68+
> ./gradlew :kafka-streams:test
69+
```
70+
71+
=== Confluent Cloud
72+
73+
Update the Confluent Cloud assets using the terraform steps as outlined xref:../README.adoc#_confluent_cloud[here].
74+
75+
Now we can execute the application (with both topologies) using Gradle as follows (from the root of the project):
76+
77+
```bash
78+
> ./gradlew :kafka-streams:bootRun
79+
```
80+
81+
Or you can use the execution features of you IDE.
82+
83+
For the Word Count example, use the URL http://localhost:8080 as the basis for your using the REST endpoint for posting new words or querying the state store.

0 commit comments

Comments
 (0)