Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ val flinkVersion = "1.8.0"

libraryDependencies += "org.apache.avro" % "avro" % "1.8.2"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided
libraryDependencies += "org.apache.flink" % "flink-avro" % flinkVersion

// make run command include the provided dependencies
Compile / run := Defaults.runTask(Compile / fullClasspath,
Expand All @@ -32,3 +33,7 @@ Global / cancelable := true

// exclude Scala library from assembly
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)

sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue

watchSources ++= ((avroSourceDirectories in Compile).value ** "*.avsc").get
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC15")
23 changes: 23 additions & 0 deletions src/main/avro/product-description.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "record",
"namespace": "nl.mrooding.data",
"name": "ProductDescription",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "updatedAt",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis",
"default": 0
}
}
]
}
23 changes: 23 additions & 0 deletions src/main/avro/product-stock.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "record",
"namespace": "nl.mrooding.data",
"name": "ProductStock",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "stock",
"type": "long"
},
{
"name": "updatedAt",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis",
"default": 0
}
}
]
}
29 changes: 29 additions & 0 deletions src/main/avro/product.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"type": "record",
"namespace": "nl.mrooding.data",
"name": "Product",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "description",
"type": ["null", "string"],
"default": null
},
{
"name": "stock",
"type": ["null", "long"],
"default": null
},
{
"name": "updatedAt",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis",
"default": 0
}
}
]
}
22 changes: 22 additions & 0 deletions src/main/resources/avro/product-description.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"type": "record",
"name": "ProductDescription",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "updatedAt",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis",
"default": 0
}
}
]
}
22 changes: 22 additions & 0 deletions src/main/resources/avro/product-stock.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"type": "record",
"name": "ProductStock",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "stock",
"type": "long"
},
{
"name": "updatedAt",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis",
"default": 0
}
}
]
}
8 changes: 5 additions & 3 deletions src/main/resources/avro/product.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
},
{
"name": "updatedAt",
"type": "long",
"logicalType": "timestamp-millis",
"default": 0
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis",
"default": 0
}
}
]
}
4 changes: 2 additions & 2 deletions src/main/scala/nl/mrooding/ProductAggregator.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package nl.mrooding

import nl.mrooding.data.{ProductDescription, ProductStock}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import nl.mrooding.source.{ProductDescriptionSource, ProductStockSource}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object ProductAggregator {
private[this] val intervalMs = 1000
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/nl/mrooding/ProductProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.flink.util.Collector

case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product] {
private[this] lazy val stateDescriptor: ValueStateDescriptor[Product] =
new ValueStateDescriptor[Product]("product-join", Product.serializer)
new ValueStateDescriptor[Product]("product-join", classOf[Product])
private[this] lazy val state: ValueState[Product] = getRuntimeContext.getState(stateDescriptor)

override def processElement1(value: ProductDescription, ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]): Unit = {
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/nl/mrooding/data/ProductDescriptionOld.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nl.mrooding.data

import java.time.Instant

case class ProductDescriptionOld(id: String,
description: String,
updatedAt: Instant)
40 changes: 40 additions & 0 deletions src/main/scala/nl/mrooding/data/ProductOld.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package nl.mrooding.data

import java.time.Instant

import nl.mrooding.state.ProductSerializer
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeutils.TypeSerializer

case class ProductOld(
id: String,
description: Option[String],
stock: Option[Long],
updatedAt: Instant
) extends AvroGenericRecordWriter {

def toGenericRecord: GenericRecord = {
val genericRecord = new GenericData.Record(ProductOld.getCurrentSchema)
genericRecord.put("id", id)
genericRecord.put("description", description.orNull)
genericRecord.put("stock", stock.getOrElse(0l))
genericRecord.put("updatedAt", updatedAt.toEpochMilli)

genericRecord
}
}

object ProductOld extends AvroSchema with AvroSerializable[ProductOld] {
val schemaPath: String = "/avro/product.avsc"

val serializer: TypeSerializer[ProductOld] = new ProductSerializer(None)

def apply(record: GenericRecord): ProductOld = {
ProductOld(
id = record.get("id").toString,
description = Option(record.get("description")).map(_.toString),
stock = Option(record.get("stock")).map(_.asInstanceOf[Long]),
updatedAt = Instant.ofEpochMilli(record.get("updatedAt").asInstanceOf[Long])
)
}
}
7 changes: 7 additions & 0 deletions src/main/scala/nl/mrooding/data/ProductStockOld.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nl.mrooding.data

import java.time.Instant

case class ProductStockOld(id: String,
stock: Long,
updatedAt: Instant)
14 changes: 7 additions & 7 deletions src/main/scala/nl/mrooding/state/ProductSerializer.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package nl.mrooding.state

import nl.mrooding.data.Product
import nl.mrooding.data.ProductOld
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.util.InstantiationUtil

class ProductSerializer(val stateSchema: Option[Schema]) extends CustomAvroSerializer[Product] {
class ProductSerializer(val stateSchema: Option[Schema]) extends CustomAvroSerializer[ProductOld] {

override def getCurrentSchema: Schema = Product.getCurrentSchema
override def getCurrentSchema: Schema = ProductOld.getCurrentSchema

override def fromGenericRecord(genericRecord: GenericRecord): Product = Product.apply(genericRecord)
override def fromGenericRecord(genericRecord: GenericRecord): ProductOld = ProductOld.apply(genericRecord)

override def duplicate(): TypeSerializer[Product] =
override def duplicate(): TypeSerializer[ProductOld] =
new ProductSerializer(stateSchema)

override def createInstance(): Product = InstantiationUtil.instantiate(classOf[Product])
override def createInstance(): ProductOld = InstantiationUtil.instantiate(classOf[ProductOld])

override def snapshotConfiguration(): TypeSerializerSnapshot[Product] = new ProductSerializerSnapshot()
override def snapshotConfiguration(): TypeSerializerSnapshot[ProductOld] = new ProductSerializerSnapshot()
}
10 changes: 5 additions & 5 deletions src/main/scala/nl/mrooding/state/ProductSerializerSnapshot.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package nl.mrooding.state

import nl.mrooding.data.Product
import nl.mrooding.data.ProductOld
import org.apache.avro.Schema
import org.apache.flink.api.common.typeutils.TypeSerializer

class ProductSerializerSnapshot(var stateSchema: Option[Schema]) extends CustomAvroSerializerSnapshot[Product] {
class ProductSerializerSnapshot(var stateSchema: Option[Schema]) extends CustomAvroSerializerSnapshot[ProductOld] {
def this() = {
this(None)
this(null)
}

override def getCurrentSchema: Schema = Product.getCurrentSchema
override def getCurrentSchema: Schema = ProductOld.getCurrentSchema

override def restoreSerializer(): TypeSerializer[Product] = new ProductSerializer(stateSchema)
override def restoreSerializer(): TypeSerializer[ProductOld] = new ProductSerializer(stateSchema)
}