Skip to content

Commit 0d82565

Browse files
committed
Implements a KafkaConsumerResource to solve consumer already closed issue
Reproducible test KafkaConsumerResourceSpec Scaladocs Revert unwanted changes a
1 parent 2117c09 commit 0d82565

File tree

10 files changed

+348
-195
lines changed

10 files changed

+348
-195
lines changed

.bsp/sbt.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"name":"sbt","version":"1.4.7","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java","-Xms100m","-Xmx100m","-classpath","/usr/local/Cellar/sbt/1.3.3/libexec/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp"]}

build.sbt

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ lazy val warnUnusedImport = Seq(
3131

3232
lazy val sharedSettings = warnUnusedImport ++ Seq(
3333
organization := "io.monix",
34-
scalaVersion := "2.12.14",
35-
crossScalaVersions := Seq("2.11.12", "2.12.14", "2.13.6"),
36-
34+
scalaVersion := "2.12.15",
35+
crossScalaVersions := Seq("2.11.12", "2.12.15", "2.13.6"),
3736
scalacOptions ++= Seq(
3837
// warnings
3938
"-unchecked", // able additional warnings where generated code depends on assumptions
@@ -84,11 +83,11 @@ lazy val sharedSettings = warnUnusedImport ++ Seq(
8483
scalacOptions ++= Seq(
8584
// Turns all warnings into errors ;-)
8685
// TODO: enable after fixing deprecations for Scala 2.13
87-
"-Xfatal-warnings",
86+
//"-Xfatal-warnings",
8887
// Enables linter options
8988
"-Xlint:adapted-args", // warn if an argument list is modified to match the receiver
90-
"-Xlint:nullary-unit", // warn when nullary methods return Unit
91-
"-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f'
89+
//"-Xlint:nullary-unit", // warn when nullary methods return Unit
90+
//"-Xlint:nullary-override", // warn when non-nullary `def f()' overrides nullary `def f'
9291
"-Xlint:infer-any", // warn when a type argument is inferred to be `Any`
9392
"-Xlint:missing-interpolator", // a string literal appears to be missing an interpolator id
9493
"-Xlint:doc-detached", // a ScalaDoc comment appears to be detached from its element
@@ -197,8 +196,9 @@ lazy val commonDependencies = Seq(
197196
// For testing ...
198197
"ch.qos.logback" % "logback-classic" % "1.2.3" % "test",
199198
"org.scalatest" %% "scalatest" % "3.0.9" % "test",
200-
"org.scalacheck" %% "scalacheck" % "1.15.2" % "test"
201-
)
199+
"org.scalacheck" %% "scalacheck" % "1.15.2" % "test",
200+
"io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1" force()
201+
),
202202
)
203203

204204
lazy val monixKafka = project.in(file("."))
@@ -212,10 +212,6 @@ lazy val kafka1x = project.in(file("kafka-1.0.x"))
212212
.settings(mimaSettings("monix-kafka-1x"))
213213
.settings(
214214
name := "monix-kafka-1x",
215-
libraryDependencies ++= {
216-
if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j"))
217-
else Seq.empty[ModuleID]
218-
},
219215
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.0.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
220216
)
221217

@@ -225,10 +221,6 @@ lazy val kafka11 = project.in(file("kafka-0.11.x"))
225221
.settings(mimaSettings("monix-kafka-11"))
226222
.settings(
227223
name := "monix-kafka-11",
228-
libraryDependencies ++= {
229-
if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j"))
230-
else Seq.empty[ModuleID]
231-
},
232224
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.3" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
233225
)
234226

@@ -237,12 +229,8 @@ lazy val kafka10 = project.in(file("kafka-0.10.x"))
237229
.settings(commonDependencies)
238230
.settings(mimaSettings("monix-kafka-10"))
239231
.settings(
240-
name := "monix-kafka-10",
241-
libraryDependencies ++= {
242-
if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "0.16.0" % "test" exclude ("log4j", "log4j"))
243-
else Seq.empty[ModuleID]
244-
},
245-
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
232+
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" force(),
233+
//dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "0.10.2.2" // exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
246234
)
247235

248236
lazy val kafka9 = project.in(file("kafka-0.9.x"))

kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import scala.util.matching.Regex
3838
trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
3939
protected def config: KafkaConsumerConfig
4040
protected def consumer: Task[Consumer[K, V]]
41+
protected val shouldClose: Boolean
4142

4243
/** Creates a task that polls the source, then feeds the downstream
4344
* subscriber, returning the resulting acknowledgement
@@ -90,7 +91,7 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
9091
private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = {
9192
// Forced asynchronous boundary
9293
val cancelTask = Task.evalAsync {
93-
consumer.synchronized(blocking(consumer.close()))
94+
if (shouldClose) { consumer.synchronized(blocking(consumer.close())) }
9495
}
9596

9697
// By applying memoization, we are turning this
@@ -113,10 +114,11 @@ object KafkaConsumerObservable {
113114
* `org.apache.kafka.clients.consumer.KafkaConsumer`
114115
* instance to use for consuming from Kafka
115116
*/
117+
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
116118
def apply[K, V](
117119
cfg: KafkaConsumerConfig,
118120
consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] =
119-
new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer)
121+
new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer, true)
120122

121123
/** Builds a [[KafkaConsumerObservable]] instance.
122124
*
@@ -126,6 +128,7 @@ object KafkaConsumerObservable {
126128
*
127129
* @param topics is the list of Kafka topics to subscribe to.
128130
*/
131+
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
129132
def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
130133
K: Deserializer[K],
131134
V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = {
@@ -142,6 +145,7 @@ object KafkaConsumerObservable {
142145
*
143146
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
144147
*/
148+
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
145149
def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
146150
K: Deserializer[K],
147151
V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = {
@@ -173,12 +177,13 @@ object KafkaConsumerObservable {
173177
* `org.apache.kafka.clients.consumer.KafkaConsumer`
174178
* instance to use for consuming from Kafka
175179
*/
180+
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
176181
def manualCommit[K, V](
177182
cfg: KafkaConsumerConfig,
178183
consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {
179184

180185
val manualCommitConfig = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false)
181-
new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer)
186+
new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer, shouldClose = true)
182187
}
183188

184189
/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
@@ -202,6 +207,7 @@ object KafkaConsumerObservable {
202207
*
203208
* @param topics is the list of Kafka topics to subscribe to.
204209
*/
210+
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
205211
def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
206212
K: Deserializer[K],
207213
V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {
@@ -231,6 +237,7 @@ object KafkaConsumerObservable {
231237
*
232238
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
233239
*/
240+
@deprecated("Use `KafkaConsumerResource`.", "1.0.0-RC8")
234241
def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
235242
K: Deserializer[K],
236243
V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {

kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ import scala.util.{Failure, Success}
3434
*/
3535
final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (
3636
override protected val config: KafkaConsumerConfig,
37-
override protected val consumer: Task[Consumer[K, V]])
37+
override protected val consumer: Task[Consumer[K, V]],
38+
override protected val shouldClose: Boolean)
3839
extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] {
3940

4041
/* Based on the [[KafkaConsumerConfig.observableCommitType]] it

kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ import scala.jdk.CollectionConverters._
3636
*/
3737
final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
3838
override protected val config: KafkaConsumerConfig,
39-
override protected val consumer: Task[Consumer[K, V]])
39+
override protected val consumer: Task[Consumer[K, V]],
40+
override protected val shouldClose: Boolean)
4041
extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] {
4142

4243
// Caching value to save CPU cycles
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright (c) 2014-2021 by The Monix Project Developers.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package monix.kafka
18+
19+
import cats.effect.Resource
20+
import monix.eval.Task
21+
import monix.kafka.KafkaConsumerObservable.createConsumer
22+
import monix.kafka.config.ObservableCommitOrder
23+
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
24+
25+
import scala.concurrent.blocking
26+
import scala.util.matching.Regex
27+
28+
/** Exposes an `Observable` that consumes a Kafka stream by
29+
* means of a Kafka Consumer client.
30+
*
31+
* In order to get initialized, it needs a configuration. See the
32+
* [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`,
33+
* (in the resource files) that is exposing all default values.
34+
*/
35+
object KafkaConsumerResource {
36+
37+
38+
/** A [[Resource]] that acquires a [[KafkaConsumer]] used
39+
* to build a [[KafkaConsumerObservableAutoCommit]] instance,
40+
* that will be released after it's usage.
41+
*
42+
* @note The consumer will act consequently depending on the
43+
* [[ObservableCommitOrder]] that was chosen from configuration.
44+
* Which can be configured from the key `monix.observable.commit.order`.
45+
*
46+
* @param cfg is the [[KafkaConsumerConfig]] needed for initializing the
47+
* consumer; also make sure to see `monix/kafka/default.conf` for
48+
* the default values being used.
49+
* @param consumer is a factory for the
50+
* `org.apache.kafka.clients.consumer.KafkaConsumer`
51+
* instance to use for consuming from Kafka
52+
*/
53+
def apply[K, V](
54+
cfg: KafkaConsumerConfig,
55+
consumer: Task[Consumer[K, V]]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]]= {
56+
for {
57+
consumer <- Resource.make(consumer){ consumer =>
58+
Task.evalAsync(consumer.synchronized{ blocking(consumer.close())})
59+
}
60+
consumerObservable <- Resource.liftF(Task(new KafkaConsumerObservableAutoCommit[K, V](cfg, Task.pure(consumer), shouldClose = false)))
61+
} yield consumerObservable
62+
}
63+
64+
/** A [[Resource]] that acquires a [[KafkaConsumer]] used
65+
* to build a [[KafkaConsumerObservableAutoCommit]] instance,
66+
* that will be released after it's usage.
67+
*
68+
* @note The consumer will act consequently depending on the
69+
* [[ObservableCommitOrder]] that was chosen from configuration.
70+
* Which can be configured from the key `monix.observable.commit.order`.
71+
*
72+
* @param cfg is the [[KafkaConsumerConfig]] needed for initializing the
73+
* consumer; also make sure to see `monix/kafka/default.conf` for
74+
* the default values being used.
75+
* @param topics is the list of Kafka topics to subscribe to.
76+
*/
77+
def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
78+
K: Deserializer[K],
79+
V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]] = {
80+
val consumer = createConsumer[K, V](cfg, topics)
81+
apply(cfg, consumer)
82+
}
83+
84+
/** A [[Resource]] that acquires a [[KafkaConsumer]] used
85+
* to build a [[KafkaConsumerObservableAutoCommit]] instance,
86+
* that will be released after it's usage.
87+
*
88+
* @note The consumer will act consequently depending on the
89+
* [[ObservableCommitOrder]] that was chosen from configuration.
90+
* Which can be configured from the key `monix.observable.commit.order`.
91+
*
92+
* @param cfg is the [[KafkaConsumerConfig]] needed for initializing the
93+
* consumer; also make sure to see `monix/kafka/default.conf` for
94+
* the default values being used.
95+
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
96+
*/
97+
def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
98+
K: Deserializer[K],
99+
V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]] = {
100+
101+
val consumer = createConsumer[K, V](cfg, topicsRegex)
102+
apply(cfg, consumer)
103+
}
104+
105+
/**
106+
* A [[Resource]] that acquires a [[KafkaConsumer]] used
107+
* to build a [[KafkaConsumerObservableManualCommit]] instance,
108+
* which provides the ability to manual commit offsets and
109+
* forcibly disables auto commits in configuration.
110+
* Such instances emit [[CommittableMessage]] instead of Kafka's [[ConsumerRecord]].
111+
*
112+
* ==Example==
113+
* {{{
114+
* KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages =>
115+
* committableMessages.map(message => message.record.value() -> message.committableOffset)
116+
* .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
117+
* .bufferTimedAndCounted(1.second, 1000)
118+
* .mapEval(offsets => CommittableOffsetBatch(offsets).commitAsync())
119+
* .completedL
120+
* }}}
121+
*
122+
* @param cfg is the [[KafkaConsumerConfig]] needed for initializing the
123+
* consumer; also make sure to see `monix/kafka/default.conf` for
124+
* the default values being used. Auto commit will disabled and
125+
* observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly!
126+
*
127+
* @param consumer is a factory for the
128+
* `org.apache.kafka.clients.consumer.KafkaConsumer`
129+
* instance to use for consuming from Kafka
130+
*/
131+
def manualCommit[K, V](
132+
cfg: KafkaConsumerConfig,
133+
consumer: Task[Consumer[K, V]]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = {
134+
for {
135+
consumer <- Resource.make(consumer){ consumer =>
136+
Task.evalAsync(consumer.synchronized{ blocking(consumer.close())})
137+
}
138+
manualCommitConf = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false)
139+
consumerObservable <- Resource.liftF(Task(new KafkaConsumerObservableManualCommit[K, V](manualCommitConf, Task.pure(consumer), shouldClose = false)))
140+
} yield consumerObservable
141+
}
142+
143+
/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
144+
* and forcibly disables auto commits in configuration.
145+
* Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord.
146+
*
147+
* ==Example==
148+
* {{{
149+
* KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages =>
150+
* committableMessages.map(message => message.record.value() -> message.committableOffset)
151+
* .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
152+
* .bufferTimedAndCounted(1.second, 1000)
153+
* .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync())
154+
* .completedL
155+
* }
156+
* }}}
157+
*
158+
* @param cfg is the [[KafkaConsumerConfig]] needed for initializing the
159+
* consumer; also make sure to see `monix/kafka/default.conf` for
160+
* the default values being used. Auto commit will disabled and
161+
* observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly!
162+
*
163+
* @param topics is the list of Kafka topics to subscribe to.
164+
*/
165+
def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
166+
K: Deserializer[K],
167+
V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = {
168+
169+
val consumer = createConsumer[K, V](cfg, topics)
170+
manualCommit(cfg, consumer)
171+
}
172+
173+
/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
174+
* and forcibly disables auto commits in configuration.
175+
* Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord.
176+
*
177+
* ==Example==
178+
* {{{
179+
* KafkaConsumerResource.manualCommit[String,String](consumerCfg, List(topicName)).use{ committableMessages =>
180+
* committableMessages.map(message => message.record.value() -> message.committableOffset)
181+
* .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
182+
* .bufferTimedAndCounted(1.second, 1000)
183+
* .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync())
184+
* .completedL
185+
* }
186+
* }}}
187+
*
188+
* @param cfg is the [[KafkaConsumerConfig]] needed for initializing the
189+
* consumer; also make sure to see `monix/kafka/default.conf` for
190+
* the default values being used. Auto commit will disabled and
191+
* observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly!
192+
*
193+
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
194+
*/
195+
def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
196+
K: Deserializer[K],
197+
V: Deserializer[V]): Resource[Task, KafkaConsumerObservable[K, V, CommittableMessage[K, V]]] = {
198+
199+
val consumer = createConsumer[K, V](cfg, topicsRegex)
200+
manualCommit(cfg, consumer)
201+
}
202+
203+
}

kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ sealed trait ObservableCommitOrder extends Serializable {
4646
}
4747
}
4848

49+
4950
object ObservableCommitOrder {
5051

5152
@throws(classOf[BadValue])
@@ -61,7 +62,7 @@ object ObservableCommitOrder {
6162
/** Do a `commit` in the Kafka Consumer before
6263
* receiving an acknowledgement from downstream.
6364
*/
64-
case object BeforeAck extends ObservableCommitOrder {
65+
case object BeforeAck extends ObservableCommitOrder {
6566
val id = "before-ack"
6667
}
6768

0 commit comments

Comments
 (0)