Skip to content

Commit c488fc8

Browse files
authored
Add a commit drain & stage to Kafka (#383)
1 parent ef0b8ed commit c488fc8

File tree

15 files changed

+417
-94
lines changed

15 files changed

+417
-94
lines changed

core/src/main/scala/ox/channels/actor.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@ object Actor:
1515
* cause the actor's channel to be closed with an error, and are propagated to the enclosing scope.
1616
*
1717
* The actor's mailbox (incoming channel) will have a capacity as specified by the [[BufferCapacity]] in scope.
18+
*
19+
* @param close
20+
* An optional callback that will be called uninterruptedly before the actor closes.
1821
*/
1922
def create[T](logic: T, close: Option[T => Unit] = None)(using ox: Ox, sc: BufferCapacity): ActorRef[T] =
2023
val c = BufferCapacity.newChannel[T => Unit]
2124
val ref = ActorRef(c)
2225
forkDiscard {
2326
try
2427
forever {
25-
val m = c.receive()
26-
try m(logic)
28+
try
29+
val m = c.receive()
30+
m(logic)
2731
catch
2832
case t: Throwable =>
2933
c.error(t)

core/src/main/scala/ox/flow/FlowOps.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import ox.unsupervised
2828
import java.util.concurrent.Semaphore
2929
import scala.concurrent.duration.DurationLong
3030
import scala.concurrent.duration.FiniteDuration
31+
import scala.util.control.ControlThrowable
3132

3233
class FlowOps[+T]:
3334
outer: Flow[T] =>
@@ -271,7 +272,7 @@ class FlowOps[+T]:
271272

272273
FlowEmit.channelToEmit(results, emit)
273274

274-
private val abortTake = new Exception("abort take")
275+
private val abortTake = new ControlThrowable("abort take") {}
275276

276277
/** Takes the first `n` elements from this flow and emits them. If the flow completes before emitting `n` elements, the returned flow
277278
* completes as well.

core/src/test/scala/ox/channels/ActorTest.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,15 @@ class ActorTest extends AnyFlatSpec with Matchers:
8080

8181
thrown.getMessage shouldBe "boom"
8282
}
83+
84+
it should "throw a channel closed exception when the actor's scope becomes closed" in {
85+
val actor = supervised:
86+
val logic = new Test1:
87+
override def f(x: Int): Long = 10
88+
89+
Actor.create(logic)
90+
// when the scope ends, the actor's fork is interrupted
91+
92+
an[ChannelClosedException] should be thrownBy actor.ask(_.f(5))
93+
}
8394
end ActorTest

doc/integrations/kafka.md

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,16 @@ Dependency:
77
```
88

99
`Flow`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through
10-
the `KafkaFlow`, `KafkaStage` and `KafkaDrain` objects. In all cases either a manually constructed instance of a
11-
`KafkaProducer` / `KafkaConsumer` is needed, or `ProducerSettings` / `ConsumerSetttings` need to be provided with the
12-
bootstrap servers, consumer group id, key / value serializers, etc.
10+
the `KafkaFlow`, `KafkaStage` and `KafkaDrain` objects.
11+
12+
In all cases kafka producers and consumers can be provided:
13+
* by manually creating (and closing) an instance of a `KafkaProducer` / `KafkaConsumer`
14+
* through a `ProducerSettings` / `ConsumerSettings`, with the bootstrap servers, consumer group id, key/value
15+
serializers, etc. The lifetime is then managed by the flow operators.
16+
* through a thread-safe wrapper on a consumer (`ActorRef[KafkaConsumerWrapper[K, V]]`), for which the lifetime is bound
17+
to the current concurrency scope
18+
19+
## Reading from Kafka
1320

1421
To read from a Kafka topic, use:
1522

@@ -25,6 +32,8 @@ val source = KafkaFlow.subscribe(settings, topic)
2532
.runForeach { (msg: ReceivedMessage[String, String]) => ??? }
2633
```
2734

35+
## Publishing to Kafka
36+
2837
To publish data to a Kafka topic:
2938

3039
```scala mdoc:compile-only
@@ -40,6 +49,25 @@ Flow
4049
.pipe(KafkaDrain.runPublish(settings))
4150
```
4251

52+
To publish data as a mapping stage:
53+
54+
```scala mdoc:compile-only
55+
import ox.flow.Flow
56+
import ox.kafka.ProducerSettings
57+
import ox.kafka.KafkaStage.*
58+
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
59+
60+
val settings = ProducerSettings.default.bootstrapServers("localhost:9092")
61+
val metadatas: Flow[RecordMetadata] = Flow
62+
.fromIterable(List("a", "b", "c"))
63+
.map(msg => ProducerRecord[String, String]("my_topic", msg))
64+
.mapPublish(settings)
65+
66+
// process & run the metadatas flow further
67+
```
68+
69+
## Reading & publishing to Kafka with offset commits
70+
4371
Quite often data to be published to a topic (`topic1`) is computed basing on data received from another topic
4472
(`topic2`). In such a case, it's possible to commit messages from `topic2`, after the messages to `topic1` are
4573
successfully published.
@@ -63,7 +91,7 @@ computed. For example:
6391
```scala mdoc:compile-only
6492
import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, ProducerSettings, SendPacket}
6593
import ox.kafka.ConsumerSettings.AutoOffsetReset
66-
import ox.pipe
94+
import ox.*
6795
import org.apache.kafka.clients.producer.ProducerRecord
6896

6997
val consumerSettings = ConsumerSettings.default("my_group")
@@ -72,29 +100,43 @@ val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092
72100
val sourceTopic = "source_topic"
73101
val destTopic = "dest_topic"
74102

75-
KafkaFlow
76-
.subscribe(consumerSettings, sourceTopic)
77-
.map(in => (in.value.toLong * 2, in))
78-
.map((value, original) =>
79-
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
80-
.pipe(KafkaDrain.runPublishAndCommit(producerSettings))
103+
supervised:
104+
// the consumer is shared between the subscribe & offset stages
105+
// its lifetime is bound to the current concurrency scope
106+
val consumer = consumerSettings.toThreadSafeConsumerWrapper
107+
KafkaFlow
108+
.subscribe(consumer, sourceTopic)
109+
.map(in => (in.value.toLong * 2, in))
110+
.map((value, original) =>
111+
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
112+
.pipe(KafkaDrain.runPublishAndCommit(producerSettings, consumer))
81113
```
82114

83115
The offsets are committed every second in a background process.
84116

85-
To publish data as a mapping stage:
117+
## Reading from Kafka, processing data & committing offsets
118+
119+
Offsets can also be committed after the data has been processed, without producing any records to write to a topic.
120+
For that, we can use the `runCommit` drain, or the `mapCommit` stage, both of which work with a `Flow[CommitPacket]`:
86121

87122
```scala mdoc:compile-only
88-
import ox.flow.Flow
89-
import ox.kafka.ProducerSettings
90-
import ox.kafka.KafkaStage.*
91-
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
123+
import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, CommitPacket}
124+
import ox.kafka.ConsumerSettings.AutoOffsetReset
125+
import ox.*
92126

93-
val settings = ProducerSettings.default.bootstrapServers("localhost:9092")
94-
val metadatas: Flow[RecordMetadata] = Flow
95-
.fromIterable(List("a", "b", "c"))
96-
.map(msg => ProducerRecord[String, String]("my_topic", msg))
97-
.mapPublish(settings)
127+
val consumerSettings = ConsumerSettings.default("my_group")
128+
.bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
129+
val sourceTopic = "source_topic"
98130

99-
// process & run the metadatas flow further
100-
```
131+
supervised:
132+
// the consumer is shared between the subscribe & offset stages
133+
// its lifetime is bound to the current concurrency scope
134+
val consumer = consumerSettings.toThreadSafeConsumerWrapper
135+
KafkaFlow
136+
.subscribe(consumer, sourceTopic)
137+
.mapPar(10) { in =>
138+
// process the message, e.g. send an HTTP request
139+
CommitPacket(in)
140+
}
141+
.pipe(KafkaDrain.runCommit(consumer))
142+
```

kafka/src/main/scala/ox/kafka/ConsumerSettings.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package ox.kafka
22

3-
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
4-
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
3+
import org.apache.kafka.clients.consumer.ConsumerConfig
4+
import org.apache.kafka.clients.consumer.KafkaConsumer
5+
import org.apache.kafka.common.serialization.Deserializer
6+
import org.apache.kafka.common.serialization.StringDeserializer
7+
import ox.Ox
8+
import ox.channels.ActorRef
59
import ox.kafka.ConsumerSettings.AutoOffsetReset
610

711
import java.util.Properties
@@ -32,7 +36,16 @@ case class ConsumerSettings[K, V](
3236
props
3337
end toProperties
3438

39+
/** Using these settings, create a new open [[KafkaConsumer]]. The consumer is not thread-safe, and should not be used concurrently, and
40+
* has to be closed manually.
41+
*/
3542
def toConsumer: KafkaConsumer[K, V] = KafkaConsumer(toProperties, keyDeserializer, valueDeserializer)
43+
44+
/** Using these settings, create a thread-safe wrapper on top of a new open [[KafkaConsumer]]. The wrapper serializes calls using an
45+
* actor. The actor is created within the current concurrency scope and will be closed (along with the consumer) when the scope ends.
46+
*/
47+
def toThreadSafeConsumerWrapper(using Ox): ActorRef[KafkaConsumerWrapper[K, V]] =
48+
KafkaConsumerWrapper(toConsumer, closeWhenComplete = true)
3649
end ConsumerSettings
3750

3851
object ConsumerSettings:

kafka/src/main/scala/ox/kafka/KafkaConsumerWrapper.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ trait KafkaConsumerWrapper[K, V]:
1616
object KafkaConsumerWrapper:
1717
private val logger = LoggerFactory.getLogger(classOf[KafkaConsumerWrapper.type])
1818

19+
/** Create a thread-safe wrapper on top of a [[KafkaConsumer]], which serializes calls using an actor. The actor is created within the
20+
* current concurrency scope and will be closed when the scope ends if the `closeWhenComplete` flag is `true`.
21+
*/
1922
def apply[K, V](consumer: KafkaConsumer[K, V], closeWhenComplete: Boolean)(using Ox): ActorRef[KafkaConsumerWrapper[K, V]] =
2023
val logic = new KafkaConsumerWrapper[K, V]:
2124
override def subscribe(topics: Seq[String]): Unit =
@@ -41,7 +44,7 @@ object KafkaConsumerWrapper:
4144

4245
def close(wrapper: KafkaConsumerWrapper[K, V]): Unit = if closeWhenComplete then
4346
logger.debug("Closing the Kafka consumer")
44-
uninterruptible(consumer.close())
47+
consumer.close()
4548

4649
Actor.create(logic, Some(close))
4750
end apply

kafka/src/main/scala/ox/kafka/KafkaDrain.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,32 @@ object KafkaDrain:
5252

5353
/** @return
5454
* A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records)
55-
* are sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
55+
* are sent, using a producer created with the given `producerSettings`. Then, all `commit` messages (consumer records) up to their
56+
* offsets are committed, using the given `consumer`.
5657
*/
57-
def runPublishAndCommit[K, V](producerSettings: ProducerSettings[K, V])(using BufferCapacity): Flow[SendPacket[K, V]] => Unit =
58-
flow => runPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true)(flow)
58+
def runPublishAndCommit[K, V](producerSettings: ProducerSettings[K, V], consumer: ActorRef[KafkaConsumerWrapper[K, V]])(using
59+
BufferCapacity
60+
): Flow[SendPacket[K, V]] => Unit =
61+
flow => runPublishAndCommit(producerSettings.toProducer, consumer, closeWhenComplete = true)(flow)
5962

6063
/** @param producer
6164
* The producer that is used to send messages.
6265
* @return
6366
* A drain, which consumes all packets emitted by the provided [[Flow]]. For each packet, first all `send` messages (producer records)
64-
* are sent. Then, all `commit` messages (consumer records) up to their offsets are committed.
67+
* are sent, using the given `producer`. Then, all `commit` messages (consumer records) up to their offsets are committed, using the
68+
* given `consumer`.
6569
*/
66-
def runPublishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using
67-
BufferCapacity
70+
def runPublishAndCommit[K, V](producer: KafkaProducer[K, V], consumer: ActorRef[KafkaConsumerWrapper[K, V]], closeWhenComplete: Boolean)(
71+
using BufferCapacity
6872
): Flow[SendPacket[K, V]] => Unit = flow =>
6973
import KafkaStage.*
70-
flow.mapPublishAndCommit(producer, closeWhenComplete).runDrain()
74+
flow.mapPublishAndCommit(producer, consumer, closeWhenComplete).runDrain()
75+
76+
/** @return
77+
* A drain, which consumes all commit packets emitted by the provided [[Flow]]. For each packet, all `commit` messages (consumer
78+
* records) are committed: for each topic-partition, up to the highest observed offset, using the given `consumer`.
79+
*/
80+
def runCommit[K, V](consumer: ActorRef[KafkaConsumerWrapper[K, V]])(using BufferCapacity): Flow[CommitPacket] => Unit = flow =>
81+
import KafkaStage.*
82+
flow.mapCommit(consumer).runDrain()
7183
end KafkaDrain

kafka/src/main/scala/ox/kafka/KafkaFlow.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package ox.kafka
22

3-
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
3+
import org.apache.kafka.clients.consumer.ConsumerRecord
4+
import org.apache.kafka.clients.consumer.KafkaConsumer
45
import org.slf4j.LoggerFactory
56
import ox.*
7+
import ox.channels.ActorRef
68
import ox.flow.Flow
9+
import ox.flow.FlowEmit
710

811
object KafkaFlow:
912
private val logger = LoggerFactory.getLogger(classOf[KafkaFlow.type])
@@ -20,10 +23,24 @@ object KafkaFlow:
2023
Flow.usingEmit: emit =>
2124
supervised:
2225
val kafkaConsumerActor = KafkaConsumerWrapper(kafkaConsumer, closeWhenComplete)
23-
kafkaConsumerActor.tell(_.subscribe(topic :: otherTopics.toList))
24-
forever {
25-
val records = kafkaConsumerActor.ask(_.poll())
26-
records.forEach(r => emit(ReceivedMessage(kafkaConsumerActor, r)))
27-
}.tapException(logger.error("Exception when polling for records", _))
26+
doSubscribe(kafkaConsumerActor, topic, otherTopics*)(emit)
27+
28+
def subscribe[K, V](
29+
kafkaConsumerActor: ActorRef[KafkaConsumerWrapper[K, V]],
30+
topic: String,
31+
otherTopics: String*
32+
): Flow[ReceivedMessage[K, V]] =
33+
Flow.usingEmit: emit =>
34+
doSubscribe(kafkaConsumerActor, topic, otherTopics*)(emit)
35+
36+
private def doSubscribe[K, V](kafkaConsumerActor: ActorRef[KafkaConsumerWrapper[K, V]], topic: String, otherTopics: String*)(
37+
emit: FlowEmit[ReceivedMessage[K, V]]
38+
): Unit =
39+
kafkaConsumerActor.tell(_.subscribe(topic :: otherTopics.toList))
40+
forever {
41+
val records = kafkaConsumerActor.ask(_.poll())
42+
records.forEach(r => emit(ReceivedMessage(r)))
43+
}.tapException(logger.error("Exception when polling for records", _))
44+
end doSubscribe
2845

2946
end KafkaFlow

0 commit comments

Comments
 (0)