Skip to content

Commit 3c80f39

Browse files
committed
PMM-272 fs2 streams Kafka support
1 parent 2bfbde4 commit 3c80f39

File tree

9 files changed

+223
-72
lines changed

9 files changed

+223
-72
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ lazy val config = project.in(file("config"))
167167
lazy val streaming = project.in(file("streaming"))
168168
.settings(commonSettings)
169169
.settings(
170-
libraryDependencies ++= Library.fs2 ++ Library.doobie ++ Library.scalaTest ++ Library.dbDrivers ++ Library.redisson ++ Library.logging ++ Library.jackson,
170+
libraryDependencies ++= Library.fs2 ++ Library.fs2Kafka ++ Library.doobie ++ Library.scalaTest ++ Library.dbDrivers ++ Library.redisson ++ Library.logging ++ Library.jackson,
171171
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.10.0"
172172
)
173173
.dependsOn(core, config, dsl)

http/src/main/scala/ru/itclover/tsp/http/protocols/RoutesProtocols.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ trait RoutesProtocols extends SprayJsonSupport with DefaultJsonProtocol {
194194
}
195195

196196
// implicit val jdbcSinkSchemaFmt = jsonFormat(JDBCSegmentsSink.apply, "tableName", "rowSchema")
197-
implicit val jdbcOutConfFmt = jsonFormat(JDBCOutputConf.apply,
197+
implicit val jdbcOutConfFmt = jsonFormat(
198+
JDBCOutputConf.apply,
198199
"tableName",
199200
"rowSchema",
200201
"jdbcUrl",
@@ -205,7 +206,15 @@ trait RoutesProtocols extends SprayJsonSupport with DefaultJsonProtocol {
205206
"parallelism"
206207
)
207208

208-
implicit val kafkaOutConfFmt = jsonFormat5(KafkaOutputConf.apply)
209+
implicit val kafkaOutConfFmt = jsonFormat(
210+
KafkaOutputConf.apply,
211+
"broker",
212+
"topic",
213+
"serializer",
214+
"rowSchema",
215+
"parallelism"
216+
)
217+
209218

210219
implicit def outConfFmt[Event] =
211220
new RootJsonFormat[OutputConf[Event]] {

http/src/main/scala/ru/itclover/tsp/http/services/queuing/QueueManagerService.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
233233
// TODO: Report throwable
234234
log.error(s"Job $uuid failed: $throwable")
235235
case Right(_) =>
236-
// success,
236+
// success
237+
log.info(s"Job $uuid finished")
237238
}
238239
}
239240

integration/correctness/src/test/scala/ru/itclover/tsp/http/SimpleCasesTest.scala

Lines changed: 63 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@ package ru.itclover.tsp.http
33
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}
44
import akka.http.scaladsl.model.StatusCodes
55
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
6+
import cats.effect.IO
7+
import cats.effect.unsafe.implicits.global
68
import com.dimafeng.testcontainers._
9+
import fs2.kafka.{Acks, KafkaProducer, ProducerRecord, ProducerRecords, ProducerSettings, Serializer}
710
import ru.itclover.tsp.http.routes.JobReporting
811
import ru.itclover.tsp.streaming.io.{IntESValue, StringESValue}
912

13+
import scala.concurrent.duration.FiniteDuration
14+
1015
//import com.google.common.util.concurrent.ThreadFactoryBuilder
1116

1217
import java.util.{Properties, UUID}
@@ -285,10 +290,11 @@ class SimpleCasesTest
285290
)
286291

287292
val wideKafkaOutputConf = JDBCOutputConf(
288-
"events_wide_kafka_test",
289-
wideKafkaRowSchema,
290-
s"jdbc:clickhouse://localhost:$port/default",
291-
"ru.yandex.clickhouse.ClickHouseDriver"
293+
tableName = "events_wide_kafka_test",
294+
rowSchema = wideKafkaRowSchema,
295+
jdbcUrl = chConnection,
296+
driverName = chDriver,
297+
userName = Some("default")
292298
)
293299

294300
override def afterStart(): Unit = {
@@ -321,16 +327,25 @@ class SimpleCasesTest
321327

322328
// Kafka producer
323329
// TODO: Send to Kafka
324-
val props = new Properties()
325-
props.put("bootstrap.servers", kafkaBrokerUrl)
326-
props.put("acks", "all")
327-
props.put("retries", "2")
328-
props.put("auto.commit.interval.ms", "1000")
329-
props.put("linger.ms", "1")
330-
props.put("block.on.buffer.full", "true")
331-
props.put("auto.create.topics.enable", "true")
332-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
333-
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
330+
// val props = new Properties()
331+
// props.put("bootstrap.servers", kafkaBrokerUrl)
332+
// props.put("acks", "all")
333+
// props.put("retries", "2")
334+
// props.put("auto.commit.interval.ms", "1000")
335+
// props.put("linger.ms", "1")
336+
// props.put("block.on.buffer.full", "true")
337+
// props.put("auto.create.topics.enable", "true")
338+
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
339+
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
340+
341+
val producerSettings = ProducerSettings(
342+
keySerializer = Serializer[IO, String],
343+
valueSerializer = Serializer[IO, String]
344+
).withBootstrapServers(kafkaBrokerUrl)
345+
.withRetries(2)
346+
.withLinger(FiniteDuration.apply(1, TimeUnit.MILLISECONDS))
347+
.withAcks(Acks.All)
348+
.withProperty("auto.create.topics.enable", "true")
334349

335350

336351
insertInfo.foreach(elem => {
@@ -343,11 +358,11 @@ class SimpleCasesTest
343358
clickhouseContainer.executeUpdate(s"INSERT INTO ${elem._1} FORMAT CSV\n${insertData}")
344359

345360
val headers = Files.readResource(elem._2).take(1).toList.headOption.getOrElse("").split(",")
346-
val data = Files.readResource(elem._2).drop(1).map(_.split(","))
361+
val data = Files.readResource(elem._2).drop(1).map(_.split(",")).toArray
347362
val numberIndices =
348363
List("dt", "ts", "POilDieselOut", "SpeedThrustMin", "PowerPolling", "value_float").map(headers.indexOf(_))
349364

350-
data.foreach {
365+
fs2.Stream.emits(data).map {
351366
row =>
352367
val convertedRow: Seq[Any] = row.indices.map(
353368
idx =>
@@ -368,11 +383,15 @@ class SimpleCasesTest
368383
.mkString(", ") + "}"
369384
val topic = elem._1.filter(_ != '`')
370385
println(s"Sending to $topic $msgKey --- $json")
371-
//producer.send(new ProducerRecord[String, String](topic, msgKey, json)).get()
386+
val rec = ProducerRecord(topic, msgKey, json)
387+
ProducerRecords.one(rec)
372388
}
373-
})
389+
.through(KafkaProducer.pipe(producerSettings))
390+
.compile
391+
.drain
392+
.unsafeRunSync
374393

375-
//producer.close()
394+
})
376395
}
377396

378397
def firstValidationQuery(table: String, numbers: Seq[Range]) = s"""
@@ -536,42 +555,30 @@ class SimpleCasesTest
536555
inner(numbers, Nil)
537556
}
538557

539-
// "Cases 1-17, 43-50" should "work in wide Kafka table" in {
540-
// casesPatterns.keys.foreach { id =>
541-
// Post(
542-
// "/job/submit/",
543-
// FindPatternsRequest(s"17kafkawide_$id", wideKafkaInputConf, wideKafkaOutputConf, List(casesPatterns(id)))
544-
// ) ~>
545-
// route ~> check {
546-
// withClue(s"Pattern ID: $id") {
547-
// status shouldEqual StatusCodes.OK
548-
// }
549-
// //alertByQuery(List(List(id.toDouble, incidentsCount(id).toDouble)), s"SELECT $id, COUNT(*) FROM events_wide_test WHERE id = $id")
550-
// }
551-
// }
552-
// Thread.sleep(60000)
553-
// casesPatterns.keys.foreach { id =>
554-
// Get(
555-
// s"/job/17kafkawide_$id/stop"
556-
// ) ~>
557-
// route ~> check {
558-
// withClue(s"Pattern ID: $id") {
559-
// status shouldEqual StatusCodes.OK
560-
// //response.toString shouldBe ""
561-
// }
562-
// //alertByQuery(List(List(id.toDouble, incidentsCount(id).toDouble)), s"SELECT $id, COUNT(*) FROM events_wide_test WHERE id = $id")
563-
// }
564-
// }
565-
// alertByQuery(
566-
// incidentsCount
567-
// .map {
568-
// case (k, v) => List(k.toDouble, v.toDouble)
569-
// }
570-
// .toList
571-
// .sortBy(_.headOption.getOrElse(Double.NaN)),
572-
// firstValidationQuery("events_wide_kafka_test", numbersToRanges(casesPatterns.keys.map(_.toInt).toList.sorted))
573-
// )
574-
// alertByQuery(incidentsTimestamps, secondValidationQuery.format("events_wide_kafka_test"))
575-
// }
558+
"Cases 1-17, 43-50" should "work in wide Kafka table" in {
559+
casesPatterns.keys.foreach { id =>
560+
Post(
561+
"/job/submit/",
562+
FindPatternsRequest(s"17kafkawide_$id", wideKafkaInputConf, Seq(wideKafkaOutputConf), 50, List(casesPatterns(id)))
563+
) ~>
564+
route ~> check {
565+
withClue(s"Pattern ID: $id") {
566+
status shouldEqual StatusCodes.OK
567+
}
568+
//alertByQuery(List(List(id.toDouble, incidentsCount(id).toDouble)), s"SELECT $id, COUNT(*) FROM events_wide_test WHERE id = $id")
569+
}
570+
}
571+
Thread.sleep(60000)
572+
alertByQuery(
573+
incidentsCount
574+
.map {
575+
case (k, v) => List(k.toDouble, v.toDouble)
576+
}
577+
.toList
578+
.sortBy(_.headOption.getOrElse(Double.NaN)),
579+
firstValidationQuery("events_wide_kafka_test", numbersToRanges(casesPatterns.keys.map(_.toInt).toList.sorted))
580+
)
581+
alertByQuery(incidentsTimestamps, secondValidationQuery.format("events_wide_kafka_test"))
582+
}
576583
override val reporting: Option[JobReporting] = None
577584
}

project/Library.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ object Version {
1515
val akkaHttp = "10.1.9"
1616

1717
val cats = "3.3.12"
18-
val fs2 = "3.2.7"
18+
val fs2 = "3.2.8"
19+
val fs2Kafka = "2.4.0"
1920
val doobie = "1.0.0-RC2"
2021

2122
val scalaTest = "3.0.8"
@@ -112,6 +113,10 @@ object Library {
112113
"co.fs2" %% "fs2-core" % Version.fs2
113114
)
114115

116+
val fs2Kafka: Seq[ModuleID] = Seq(
117+
"com.github.fd4s" %% "fs2-kafka" % Version.fs2Kafka
118+
)
119+
115120
val doobie: Seq[ModuleID] = Seq(
116121
"org.tpolecat" %% "doobie-core" % Version.doobie
117122
)

streaming/src/main/scala/ru/itclover/tsp/streaming/PatternsSearchJob.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,17 @@ case class PatternsSearchJob[In, InKey, InItem](
136136
case Some(_) =>
137137
import source.{extractor, timeExtractor, eventCreator, kvExtractor, keyCreator}
138138
dataStream
139-
//.keyBy(source.partitioner)
140-
.map( event =>
141-
SparseRowsDataAccumulator[In, InKey, InItem, In](
142-
source.asInstanceOf[StreamSource[In, InKey, InItem]],
143-
source.patternFields
144-
).map(event)
145-
)
146-
.unNone
139+
.through(StreamPartitionOps.groupBy(p => IO { source.partitioner(p) }))
140+
.map {
141+
case (_, str ) => str.map( event =>
142+
SparseRowsDataAccumulator[In, InKey, InItem, In](
143+
source.asInstanceOf[StreamSource[In, InKey, InItem]],
144+
source.patternFields
145+
).map(event)
146+
)
147+
.unNone
148+
}
149+
.parJoinUnbounded
147150
//.setParallelism(1) // SparseRowsDataAccumulator cannot work in parallel
148151
case _ => dataStream
149152
}

streaming/src/main/scala/ru/itclover/tsp/streaming/StreamSource.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import doobie.implicits._
88
import java.sql.{PreparedStatement, ResultSet}
99
import doobie.{ConnectionIO, FC, FPS, FRS, PreparedStatementIO, ResultSetIO, Transactor}
1010
import doobie.util.stream.repeatEvalChunks
11+
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, Deserializer, KafkaConsumer}
1112
import ru.itclover.tsp.StreamSource.Row
1213
import ru.itclover.tsp.core.Pattern.{Idx, IdxExtractor}
1314
import ru.itclover.tsp.core.io.{Decoder, Extractor, TimeExtractor}
1415
import ru.itclover.tsp.services.KafkaService
1516
import ru.itclover.tsp.streaming.io._
17+
import ru.itclover.tsp.streaming.serialization.JsonDeserializer
1618
import ru.itclover.tsp.streaming.services.JdbcService
1719
import ru.itclover.tsp.streaming.transformers.SparseRowsDataAccumulator
1820
import ru.itclover.tsp.streaming.utils.{EventCreator, EventCreatorInstances, KeyCreator, KeyCreatorInstances}
@@ -359,7 +361,32 @@ case class KafkaSource(
359361

360362
val stageName = "Kafka input processing stage"
361363

362-
def createStream: fs2.Stream[IO, RowWithIdx] = ???
364+
val consumerSettings = ConsumerSettings(
365+
keyDeserializer = Deserializer.unit[IO],
366+
valueDeserializer = Deserializer.instance[IO, Row](
367+
(topic, headers, bytes) => {
368+
val deserialized = JsonDeserializer(fieldsClasses).deserialize(bytes)
369+
deserialized match {
370+
case Right(value) => IO.pure(value)
371+
case Left(throwable) => ??? // TODO: Deserialization error
372+
}
373+
}
374+
)
375+
)
376+
.withBootstrapServers(conf.brokers)
377+
.withGroupId(conf.group)
378+
.withAutoOffsetReset(AutoOffsetReset.Latest)
379+
380+
def createStream: fs2.Stream[IO, RowWithIdx] =
381+
KafkaConsumer
382+
.stream(consumerSettings)
383+
.subscribeTo(conf.topic)
384+
.records
385+
.map { committable =>
386+
committable.record.value
387+
}
388+
.zipWithIndex
389+
.map { case (r, i) => RowWithIdx(i + 1, r) }
363390

364391
def partitionsIdx = conf.partitionFields.filter(fieldsIdxMap.contains).map(fieldsIdxMap)
365392
def transformedPartitionsIdx = conf.partitionFields.map(transformedFieldsIdxMap)

streaming/src/main/scala/ru/itclover/tsp/streaming/io/OutputConf.scala

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ package ru.itclover.tsp.streaming.io
22

33
import cats.effect.{IO, MonadCancelThrow, Resource}
44
import cats.implicits._
5+
import com.fasterxml.jackson.databind.ObjectMapper
6+
import com.fasterxml.jackson.databind.node.ObjectNode
57
import doobie.WeakAsync.doobieWeakAsyncForAsync
68
import doobie.{ConnectionIO, FC, Transactor, Update0}
79
import doobie.implicits._
810
import doobie.util.fragment.Fragment
911
import fs2.Pipe
12+
import fs2.kafka.{Acks, KafkaProducer, ProducerRecord, ProducerRecords, ProducerSettings, Serializer}
1013
import ru.itclover.tsp.StreamSource.Row
1114

12-
import java.sql.Connection
15+
import java.sql.{Connection, Timestamp}
16+
import java.time.{ZoneId, ZonedDateTime}
17+
import java.util.concurrent.TimeUnit
18+
import scala.concurrent.duration.FiniteDuration
1319
import scala.util.control.NonFatal
1420

1521
trait OutputConf[Event] {
@@ -137,5 +143,59 @@ case class KafkaOutputConf(
137143
rowSchema: EventSchema,
138144
parallelism: Option[Int] = Some(1)
139145
) extends OutputConf[Row] {
140-
override def getSink: Pipe[IO, Row, Unit] = ???
146+
val producerSettings = ProducerSettings(
147+
keySerializer = Serializer[IO, String],
148+
valueSerializer = Serializer[IO, String]
149+
).withBootstrapServers(broker)
150+
.withAcks(Acks.All)
151+
.withProperty("auto.create.topics.enable", "true")
152+
153+
override def getSink: Pipe[IO, Row, Unit] = stream =>
154+
stream
155+
.map { data =>
156+
val serialized = serialize(data, rowSchema)
157+
val rec = ProducerRecord(topic, ZonedDateTime.now(ZoneId.of("UTC")).toString, serialized)
158+
ProducerRecords.one(rec)
159+
}
160+
.through(KafkaProducer.pipe(producerSettings))
161+
.drain
162+
163+
def serialize(output: Row, eventSchema: EventSchema): String = {
164+
165+
val mapper = new ObjectMapper()
166+
val root = mapper.createObjectNode()
167+
168+
// TODO: Write JSON
169+
170+
eventSchema match {
171+
case newRowSchema: NewRowSchema =>
172+
newRowSchema.data.foreach { case (k, v) =>
173+
putValueToObjectNode(k, v, root, output(newRowSchema.fieldsIndices(Symbol(k))))
174+
}
175+
}
176+
177+
def putValueToObjectNode(k: String,
178+
v: EventSchemaValue,
179+
root: ObjectNode,
180+
value: Object): Unit = {
181+
v.`type` match {
182+
case "int8" => root.put(k, value.asInstanceOf[Byte])
183+
case "int16" => root.put(k, value.asInstanceOf[Short])
184+
case "int32" => root.put(k, value.asInstanceOf[Int])
185+
case "int64" => root.put(k, value.asInstanceOf[Long])
186+
case "float32" => root.put(k, value.asInstanceOf[Float])
187+
case "float64" => root.put(k, value.asInstanceOf[Double])
188+
case "boolean" => root.put(k, value.asInstanceOf[Boolean])
189+
case "string" => root.put(k, value.asInstanceOf[String])
190+
case "timestamp" => root.put(k, value.asInstanceOf[Timestamp].toString)
191+
case "object" =>
192+
val data = value.toString
193+
val parsedJson = mapper.readTree(data)
194+
root.put(k, parsedJson)
195+
}
196+
}
197+
198+
mapper.writeValueAsString(root)
199+
200+
}
141201
}

0 commit comments

Comments
 (0)