Skip to content

Commit 29b78b5

Browse files
NeQuissimusmanub
authored andcommitted
Kafka 2.0.0 (#154)
1 parent a5223a9 commit 29b78b5

File tree

14 files changed

+61
-75
lines changed

14 files changed

+61
-75
lines changed

.travis.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
language: scala
22

3-
jdk:
3+
jdk:
44
- oraclejdk8
55

66
scala:
@@ -19,6 +19,5 @@ before_cache:
1919
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
2020
- find $HOME/.sbt -name "*.lock" -print -delete
2121

22-
script:
22+
script:
2323
- travis_retry sbt ++$TRAVIS_SCALA_VERSION test
24-

build.sbt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import sbtrelease.Version
22

33
parallelExecution in ThisBuild := false
44

5-
val kafkaVersion = "1.1.1"
6-
val confluentVersion = "4.1.1"
5+
val kafkaVersion = "2.0.0"
6+
val confluentVersion = "5.0.0"
77
val akkaVersion = "2.5.14"
88

99
lazy val commonSettings = Seq(
@@ -22,6 +22,7 @@ lazy val commonSettings = Seq(
2222
lazy val commonLibrarySettings = libraryDependencies ++= Seq(
2323
"org.apache.avro" % "avro" % "1.8.2",
2424
"org.apache.kafka" %% "kafka" % kafkaVersion,
25+
"org.slf4j" % "slf4j-log4j12" % "1.7.25" % Test,
2526
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
2627
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
2728
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
@@ -53,6 +54,13 @@ lazy val releaseSettings = Seq(
5354
releaseCrossBuild := true
5455
)
5556

57+
// https://github.com/sbt/sbt/issues/3618
58+
// [error] (kafkaStreams / update) sbt.librarymanagement.ResolveException: download failed: javax.ws.rs#javax.ws.rs-api;2.1!javax.ws.rs-api.${packaging.type}
59+
val workaround = {
60+
sys.props += "packaging.type" -> "jar"
61+
()
62+
}
63+
5664
lazy val root = (project in file("."))
5765
.settings(name := "scalatest-embedded-kafka-root")
5866
.settings(commonSettings: _*)

embedded-kafka/src/main/scala/net/manub/embeddedkafka/Codecs.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package net.manub.embeddedkafka
22

3-
import kafka.serializer._
43
import org.apache.kafka.clients.consumer.ConsumerRecord
54
import org.apache.kafka.common.serialization._
65

76
/** useful encoders/serializers, decoders/deserializers and [[ConsumerRecord]] decoders**/
87
object Codecs {
9-
implicit val stringEncoder: Encoder[String] = new StringEncoder()
10-
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
118
implicit val stringSerializer: Serializer[String] = new StringSerializer()
129
implicit val nullSerializer: Serializer[Array[Byte]] =
1310
new ByteArraySerializer()
1411

15-
implicit val stringDecoder: Decoder[String] = new StringDecoder()
16-
implicit val nullDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
1712
implicit val stringDeserializer: Deserializer[String] =
1813
new StringDeserializer()
1914
implicit val nullDeserializer: Deserializer[Array[Byte]] =

embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ object ConsumerExtensions {
4848
import scala.collection.JavaConverters._
4949
consumer.subscribe(topics.asJava)
5050
topics.foreach(consumer.partitionsFor)
51-
val records = consumer.poll(poll)
51+
val records = consumer.poll(java.time.Duration.ofMillis(poll))
5252
// use toList to force eager evaluation. toSeq is lazy
5353
records.iterator().asScala.toList.map(decoder(_))
5454
}.recover {

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
588588
topics.foreach(consumer.partitionsFor)
589589

590590
while (messagesRead < number && System.nanoTime < timeoutNanoTime) {
591-
val records = consumer.poll(1000)
591+
val records = consumer.poll(java.time.Duration.ofMillis(1000))
592592
val recordIter = records.iterator()
593593
if (resetTimeoutOnEachMessage && recordIter.hasNext) {
594594
timeoutNanoTime = System.nanoTime + timeout.toNanos

embedded-kafka/src/main/scala/net/manub/embeddedkafka/avro/avroMarshallers.scala

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package net.manub.embeddedkafka.avro
22

33
import java.io.ByteArrayOutputStream
44

5-
import kafka.serializer.{Decoder, Encoder}
65
import kafka.utils.VerifiableProperties
76
import org.apache.avro.Schema
87
import org.apache.avro.io._
@@ -13,58 +12,39 @@ import org.apache.avro.specific.{
1312
}
1413
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
1514

16-
class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema,
17-
props: VerifiableProperties = null)
18-
extends Decoder[T] {
19-
private val NoInstanceReuse = null.asInstanceOf[T]
20-
private val NoDecoderReuse = null.asInstanceOf[BinaryDecoder]
15+
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
16+
extends Deserializer[T]
17+
with NoOpConfiguration
18+
with NoOpClose {
19+
2120
private val reader = new SpecificDatumReader[T](schema)
2221

23-
override def fromBytes(bytes: Array[Byte]): T = {
24-
val decoder = DecoderFactory.get().binaryDecoder(bytes, NoDecoderReuse)
25-
reader.read(NoInstanceReuse, decoder)
22+
override def deserialize(topic: String, data: Array[Byte]): T = {
23+
val decoder = DecoderFactory.get().binaryDecoder(data, null)
24+
reader.read(null.asInstanceOf[T], decoder)
2625
}
2726
}
2827

29-
class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null)
30-
extends Encoder[T] {
31-
private val NoEncoderReuse = null.asInstanceOf[BinaryEncoder]
28+
class KafkaAvroSerializer[T <: SpecificRecord]()
29+
extends Serializer[T]
30+
with NoOpConfiguration
31+
with NoOpClose {
3232

33-
override def toBytes(nullableData: T): Array[Byte] = {
33+
private def toBytes(nullableData: T): Array[Byte] =
3434
Option(nullableData).fold[Array[Byte]](null) { data =>
3535
val writer: DatumWriter[T] = new SpecificDatumWriter[T](data.getSchema)
3636
val out = new ByteArrayOutputStream()
37-
val encoder = EncoderFactory.get.binaryEncoder(out, NoEncoderReuse)
37+
val encoder = EncoderFactory.get.binaryEncoder(out, null)
3838

3939
writer.write(data, encoder)
4040
encoder.flush()
4141
out.close()
4242

4343
out.toByteArray
4444
}
45-
}
46-
}
47-
48-
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
49-
extends Deserializer[T]
50-
with NoOpConfiguration
51-
with NoOpClose {
52-
53-
private val decoder = new KafkaAvroDecoder[T](schema = schema)
54-
55-
override def deserialize(topic: String, data: Array[Byte]): T =
56-
decoder.fromBytes(data)
57-
}
58-
59-
class KafkaAvroSerializer[T <: SpecificRecord]()
60-
extends Serializer[T]
61-
with NoOpConfiguration
62-
with NoOpClose {
63-
64-
private val encoder = new KafkaAvroEncoder[T]()
6545

6646
override def serialize(topic: String, data: T): Array[Byte] =
67-
encoder.toBytes(data)
47+
toBytes(data)
6848
}
6949

7050
sealed trait NoOpConfiguration {
Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
11
package net.manub.embeddedkafka
22

3-
import kafka.serializer.{Decoder, Encoder}
43
import kafka.utils.VerifiableProperties
54
import org.apache.avro.Schema
65
import org.apache.avro.specific.SpecificRecord
76
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
87

98
package object avro {
10-
119
implicit def specificAvroSerializer[T <: SpecificRecord]: Serializer[T] =
1210
new KafkaAvroSerializer[T]
13-
implicit def specificAvroEncoder[T <: SpecificRecord]: Encoder[T] =
14-
new KafkaAvroEncoder[T]
1511

1612
def specificAvroDeserializer[T <: SpecificRecord](
1713
schema: Schema): Deserializer[T] =
1814
new KafkaAvroDeserializer[T](schema)
19-
20-
def specificAvroDecoder[T <: SpecificRecord](schema: Schema,
21-
props: VerifiableProperties =
22-
null): Decoder[T] =
23-
new KafkaAvroDecoder[T](schema, props)
2415
}

embedded-kafka/src/test/scala/net/manub/embeddedkafka/ConsumerExtensionsSpec.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ class ConsumerExtensionsSpec
3030
.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]]
3131
.asJava)
3232

33-
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
33+
when(consumer.poll(java.time.Duration.ofMillis(retryConf.poll)))
34+
.thenReturn(consumerRecords)
3435

3536
consumer.consumeLazily[String]("topic")
3637

37-
verify(consumer, times(retryConf.maximumAttempts)).poll(retryConf.poll)
38+
verify(consumer, times(retryConf.maximumAttempts))
39+
.poll(java.time.Duration.ofMillis(retryConf.poll))
3840
}
3941

4042
"not retry to get messages with the configured maximum number of attempts when poll succeeds" in {
@@ -48,11 +50,12 @@ class ConsumerExtensionsSpec
4850
new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava
4951
)
5052

51-
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
53+
when(consumer.poll(java.time.Duration.ofMillis(retryConf.poll)))
54+
.thenReturn(consumerRecords)
5255

5356
consumer.consumeLazily[String]("topic")
5457

55-
verify(consumer).poll(retryConf.poll)
58+
verify(consumer).poll(java.time.Duration.ofMillis(retryConf.poll))
5659
}
5760

5861
"poll to get messages with the configured poll timeout" in {
@@ -65,11 +68,12 @@ class ConsumerExtensionsSpec
6568
.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]]
6669
.asJava)
6770

68-
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
71+
when(consumer.poll(java.time.Duration.ofMillis(retryConf.poll)))
72+
.thenReturn(consumerRecords)
6973

7074
consumer.consumeLazily[String]("topic")
7175

72-
verify(consumer).poll(retryConf.poll)
76+
verify(consumer).poll(java.time.Duration.ofMillis(retryConf.poll))
7377
}
7478
}
7579

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class EmbeddedKafkaMethodsSpec
5151
val consumer = kafkaConsumer
5252
consumer.subscribe(List(topic).asJava)
5353

54-
val records = consumer.poll(consumerPollTimeout)
54+
val records =
55+
consumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
5556

5657
records.iterator().hasNext shouldBe true
5758
val record = records.iterator().next()
@@ -77,7 +78,8 @@ class EmbeddedKafkaMethodsSpec
7778
val consumer = kafkaConsumer
7879
consumer.subscribe(List(topic).asJava)
7980

80-
val records = consumer.poll(consumerPollTimeout)
81+
val records =
82+
consumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
8183

8284
records.iterator().hasNext shouldBe true
8385
val record = records.iterator().next()
@@ -102,7 +104,8 @@ class EmbeddedKafkaMethodsSpec
102104
val consumer = kafkaConsumer
103105
consumer.subscribe(List(topic).asJava)
104106

105-
val records = consumer.poll(consumerPollTimeout)
107+
val records =
108+
consumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
106109

107110
records.iterator().hasNext shouldBe true
108111
val record = records.iterator().next()
@@ -129,7 +132,9 @@ class EmbeddedKafkaMethodsSpec
129132
val consumer = kafkaConsumer
130133
consumer.subscribe(List(topic).asJava)
131134

132-
val records = consumer.poll(consumerPollTimeout).iterator()
135+
val records = consumer
136+
.poll(java.time.Duration.ofMillis(consumerPollTimeout))
137+
.iterator()
133138

134139
records.hasNext shouldBe true
135140

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaObjectSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
110110
kafkaConsumer(someOtherConfig, deserializer, deserializer)
111111
anotherConsumer.subscribe(List(topic).asJava)
112112

113-
val moreRecords = anotherConsumer.poll(consumerPollTimeout)
113+
val moreRecords =
114+
anotherConsumer.poll(java.time.Duration.ofMillis(consumerPollTimeout))
114115
moreRecords.count shouldBe 1
115116

116117
val someOtherRecord = moreRecords.iterator().next

0 commit comments

Comments
 (0)