Skip to content

Commit ee4711d

Browse files
Merge branch 'master' into update/sbt-mdoc-2.8.0
2 parents faeeeb5 + 13ae76c commit ee4711d

File tree

19 files changed

+849
-27
lines changed

19 files changed

+849
-27
lines changed

.git-blame-ignore-revs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,6 @@ b83264042216c7d83b73247b8b7a998902ec26d5
4545

4646
# Scala Steward: Reformat with scalafmt 3.9.10
4747
7d900f00d9e3f23f45fca7989ae6a4c1c75580c7
48+
49+
# Scala Steward: Reformat with scalafmt 3.10.0
50+
12eaae6b3f4f112a41e4e69dc5c7109ba0296dcd

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This workflow will build a Java project with Maven
22
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
33

4-
name: CI
4+
name: All Tests
55

66
on:
77
push:
@@ -15,15 +15,15 @@ jobs:
1515
strategy:
1616
matrix:
1717
java: [11, 17]
18-
scala: [2.13.17, 3.3.6]
18+
scala: [2.13.17, 3.3.7]
1919
flink: [1.19.3, 1.20.2]
2020
sbt-module: ['flink-1-api', 'scala-api-common']
2121
include:
2222
- scala: 2.13.17
2323
java: 17
2424
flink: 2.0.0
2525
sbt-module: 'flink-2-api'
26-
- scala: 3.3.6
26+
- scala: 3.3.7
2727
java: 21
2828
flink: 2.0.0
2929
sbt-module: 'flink-2-api'

.github/workflows/release.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
name: release
1+
name: Release
22
concurrency: release
33
on:
44
push:
5-
branches: [master, main]
65
tags: ["*"]
76
jobs:
87
release:
@@ -22,7 +21,7 @@ jobs:
2221
- run: echo $PGP_SECRET | base64 --decode | gpg --batch --import
2322
env:
2423
PGP_SECRET: ${{ secrets.PGP_SECRET }}
25-
- run: sbt test ciReleaseTagNextVersion ciReleaseSonatype
24+
- run: sbt test ciReleaseSonatype
2625
env:
2726
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
2827
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}

.scalafmt.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
style = defaultWithAlign
22
maxColumn = 120
3-
version = 3.9.10
3+
version = 3.10.0
44
assumeStandardLibraryStripMargin = true
55
align.stripMargin = true
66
runner.dialect = scala3

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,56 @@ implicit val mapper2: TypeMapper[WrappedString, String] = new TypeMapper[Wrapped
352352
}
353353
```
354354

355+
### Ordering
356+
357+
`SortedSet` requires a type-information for its elements and also for the ordering of the elements. Type-information of default orderings are not implicitly available in the context because we cannot make the assumption the user wants to use the natural ordering or a custom one.
358+
359+
Type-information of default ordering are available in `org.apache.flinkx.api.serializer.OrderingTypeInfo` and can be used as follows:
360+
```scala mdoc:reset-object
361+
import org.apache.flink.api.common.typeinfo.TypeInformation
362+
import org.apache.flinkx.api._
363+
import org.apache.flinkx.api.serializer.OrderingTypeInfo
364+
import org.apache.flinkx.api.serializers._
365+
import scala.collection.immutable.SortedSet
366+
367+
case class Foo(bars: SortedSet[String])
368+
369+
object Foo {
370+
implicit val fooInfo: TypeInformation[Foo] = {
371+
// type-information for Ordering need to be explicitly put in the context
372+
implicit val orderingStringInfo: TypeInformation[Ordering[String]] =
373+
OrderingTypeInfo.DefaultStringOrderingInfo
374+
deriveTypeInformation
375+
}
376+
}
377+
```
378+
379+
It's also possible to derive the type-information of a custom ordering if it's an ADT:
380+
```scala mdoc:reset-object
381+
import org.apache.flink.api.common.typeinfo.TypeInformation
382+
import org.apache.flinkx.api._
383+
import org.apache.flinkx.api.serializer.OrderingTypeInfo
384+
import org.apache.flinkx.api.serializers._
385+
import scala.collection.immutable.SortedSet
386+
387+
case class Bar(a: Int, b: String)
388+
389+
case object BarOrdering extends Ordering[Bar] {
390+
override def compare(x: Bar, y: Bar): Int = x.a.compare(y.a)
391+
}
392+
393+
case class Foo(bar: SortedSet[Bar])
394+
395+
object Foo {
396+
implicit val fooInfo: TypeInformation[Foo] = {
397+
// Derive the type-information of custom Bar ordering
398+
implicit val barOrderingInfo: TypeInformation[Ordering[Bar]] =
399+
OrderingTypeInfo.deriveOrdering[BarOrdering.type, Bar]
400+
deriveTypeInformation
401+
}
402+
}
403+
```
404+
355405
### Schema evolution
356406

357407
#### ADT

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import sbtrelease.ReleaseStateTransformations.*
33
Global / onChangedBuildSource := ReloadOnSourceChanges
44
Global / excludeLintKeys := Set(crossScalaVersions)
55

6-
lazy val rootScalaVersion = "3.3.6"
7-
lazy val crossVersions = Seq("2.13.16", rootScalaVersion)
6+
lazy val rootScalaVersion = "3.3.7"
7+
lazy val crossVersions = Seq("2.13.17", rootScalaVersion)
88
lazy val flinkVersion1 = System.getProperty("flinkVersion1", "1.20.2")
99
lazy val flinkVersion2 = System.getProperty("flinkVersion2", "2.0.0")
1010

modules/examples/scripts/gen-csv-file.sc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//> using dep "org.flinkextended::flink-scala-api-1:1.2.9"
1+
//> using dep "org.flinkextended::flink-scala-api-1:2.0.4"
22
//> using dep "org.apache.flink:flink-clients:1.20.1"
33
//> using dep "org.apache.flink:flink-csv:1.20.1"
44
//> using dep "org.apache.flink:flink-connector-files:1.20.1"

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnap
2222
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
2323
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
2424
import org.apache.flink.types.NullFieldException
25+
import org.apache.flinkx.api.{NullMarker, VariableLengthDataType}
2526
import org.apache.flinkx.api.serializer.CaseClassSerializer.EmptyByteArray
2627
import org.slf4j.{Logger, LoggerFactory}
2728

@@ -103,11 +104,13 @@ class CaseClassSerializer[T <: Product](
103104
createInstance(fields)
104105
}
105106

106-
override val getLength: Int = if (super.getLength == -1) -1 else super.getLength + 4 // +4 bytes for the arity field
107+
override val getLength: Int =
108+
if (super.getLength == VariableLengthDataType) VariableLengthDataType
109+
else super.getLength + 4 // +4 bytes for the arity field
107110

108111
def serialize(value: T, target: DataOutputView): Unit = {
109-
// Write an arity of -1 to indicate null value
110-
val sourceArity = if (value == null) -1 else arity
112+
// Write a negative arity to indicate null value
113+
val sourceArity = if (value == null) NullMarker else arity
111114
target.writeInt(sourceArity)
112115
if (value == null) target.write(nullPadding)
113116

@@ -129,7 +132,7 @@ class CaseClassSerializer[T <: Product](
129132

130133
def deserialize(source: DataInputView): T = {
131134
val sourceArity = source.readInt()
132-
if (sourceArity == -1) {
135+
if (sourceArity < 0) {
133136
source.skipBytesToRead(nullPadding.length)
134137
null.asInstanceOf[T]
135138
} else {
@@ -146,7 +149,7 @@ class CaseClassSerializer[T <: Product](
146149
override def copy(source: DataInputView, target: DataOutputView): Unit = {
147150
val sourceArity = source.readInt()
148151
target.writeInt(sourceArity)
149-
if (sourceArity == -1) {
152+
if (sourceArity < 0) {
150153
source.skipBytesToRead(nullPadding.length)
151154
target.skipBytesToWrite(nullPadding.length)
152155
} else {

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]](
6969
case "long" => out.writeUTF("java.lang.Long")
7070
case "byte" => out.writeUTF("java.lang.Byte")
7171
case "short" => out.writeUTF("java.lang.Short")
72-
case "char" => out.writeUTF("java.lang.Char")
72+
case "char" => out.writeUTF("java.lang.Character")
7373
case "boolean" => out.writeUTF("java.lang.Boolean")
74+
case "void" => out.writeUTF("java.lang.Void")
7475
case other => out.writeUTF(other)
7576
}
7677
TypeSerializerSnapshot.writeVersionedSnapshot(out, nestedSerializer.snapshotConfiguration())
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
4+
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
6+
/** Generic serializer snapshot for sorted collection.
7+
* @param aSerializer
8+
* the serializer of `A`
9+
* @param aOrderingSerializer
10+
* the serializer of `Ordering[A]`
11+
* @param sClass
12+
* the class of `S`
13+
* @param aClass
14+
* the class of `A`
15+
* @tparam F
16+
* the type of the serialized collection
17+
* @tparam A
18+
* the type of the collection's elements
19+
* @tparam S
20+
* the type of the collection serializer
21+
*/
22+
class SortedCollectionSerializerSnapshot[F[_], A, S <: TypeSerializer[F[A]]](
23+
aSerializer: TypeSerializer[A],
24+
sClass: Class[S],
25+
aClass: Class[A],
26+
private var aOrderingSerializer: TypeSerializer[Ordering[A]]
27+
) extends CollectionSerializerSnapshot[F, A, S](aSerializer, sClass, aClass) {
28+
29+
// Empty constructor is required to instantiate this class during deserialization.
30+
def this() = this(null, null, null, null)
31+
32+
override def getCurrentVersion: Int = super.getCurrentVersion // Must be aligned with CollectionSerializerSnapshot
33+
34+
override def writeSnapshot(out: DataOutputView): Unit = {
35+
super.writeSnapshot(out)
36+
TypeSerializerSnapshot.writeVersionedSnapshot(out, aOrderingSerializer.snapshotConfiguration())
37+
}
38+
39+
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
40+
super.readSnapshot(readVersion, in, userCodeClassLoader)
41+
aOrderingSerializer =
42+
TypeSerializerSnapshot.readVersionedSnapshot[Ordering[A]](in, userCodeClassLoader).restoreSerializer()
43+
}
44+
45+
override def restoreSerializer(): TypeSerializer[F[A]] = {
46+
val constructor = clazz.getConstructors()(0)
47+
constructor.newInstance(nestedSerializer, vclazz, aOrderingSerializer).asInstanceOf[TypeSerializer[F[A]]]
48+
}
49+
50+
}

0 commit comments

Comments
 (0)