Skip to content

Commit a20b7fe

Browse files
drop scala 2.12 support, add deprecations, remove returns usage (#176)
* drop scala 2.12 support, add deprecations, remove returns usage * add a note on the last Scala 2.12 build
1 parent 07e2f29 commit a20b7fe

File tree

17 files changed

+143
-68
lines changed

17 files changed

+143
-68
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
strategy:
1616
matrix:
1717
java: [11]
18-
scala: [2.12.20, 2.13.15, 3.3.4]
18+
scala: [2.13.15, 3.3.4]
1919
flink: [1.18.1, 1.19.1]
2020
include:
2121
- scala: 3.3.4

README.md

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
# Scala 2.12/2.13/3.x API for Apache Flink
1+
# Scala 2.13/3.x API for Apache Flink
22

33
[![CI Status](https://github.com/flink-extended/flink-scala-api/workflows/CI/badge.svg)](https://github.com/flinkextended/flink-scala-api/actions)
4-
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12)
4+
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13)
55
[![License: Apache 2](https://img.shields.io/badge/License-Apache2-green.svg)](https://opensource.org/licenses/Apache-2.0)
66
![Last commit](https://img.shields.io/github/last-commit/flink-extended/flink-scala-api)
77
![Last release](https://img.shields.io/github/release/flink-extended/flink-scala-api)
88

9-
This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.12, 2.13 and 3.x.
9+
This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.13 and 3.x.
1010

1111
## Migration
1212

@@ -26,19 +26,35 @@ import org.apache.flinkx.api.serializers._
2626

2727
## Usage
2828

29-
`flink-scala-api` is released to Maven-central for 2.12, 2.13 and 3. For SBT, add this snippet to `build.sbt`:
29+
`flink-scala-api` is released to Maven-central for 2.13 and 3. For SBT, add this snippet to `build.sbt`:
3030
```scala
3131
libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.6"
3232
```
3333

34-
For Ammonite:
34+
## For Ammonite
3535

3636
```scala
3737
import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6`
3838
// you might need flink-client too in order to run in the REPL
3939
import $ivy.`org.apache.flink:flink-clients:1.18.1`
4040
```
4141

42+
## For Scala 2.12
43+
44+
If you want first to migrate to org.flinkextended:flink-scala-api staying on Scala 2.12, you can use the last build for Scala 2.12:
45+
46+
```scala
47+
libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.2.0"
48+
// or
49+
"org.flinkextended" %% "flink-scala-api" % "1.19.1_1.2.0"
50+
// or
51+
"org.flinkextended" %% "flink-scala-api" % "1.20.0_1.2.0"
52+
```
53+
54+
Build for Scala 2.12 is no longer published.
55+
56+
## SBT Project Template
57+
4258
If you want to create new project easily check this __Giter8 template__ out: [novakov-alexey/flink-scala-api.g8](https://github.com/novakov-alexey/flink-scala-api.g8)
4359

4460
## Supported Flink versions

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ lazy val flinkVersion = System.getProperty("flinkVersion", "1.18.1")
99
lazy val root = (project in file("."))
1010
.aggregate(`scala-api`, `examples`)
1111
.settings(
12+
scalaVersion := rootScalaVersion,
1213
publish / skip := true
1314
)
1415

@@ -17,7 +18,7 @@ lazy val `scala-api` = (project in file("modules/scala-api"))
1718
.settings(
1819
name := "flink-scala-api",
1920
scalaVersion := rootScalaVersion,
20-
crossScalaVersions := Seq("2.12.20", "2.13.15", rootScalaVersion),
21+
crossScalaVersions := Seq("2.13.15", rootScalaVersion),
2122
libraryDependencies ++= Seq(
2223
"org.apache.flink" % "flink-streaming-java" % flinkVersion,
2324
"org.apache.flink" % "flink-java" % flinkVersion,

modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ trait CommonTaggedDerivation[TypeClass[_]]:
3030
)*
3131
)
3232

33+
@annotation.nowarn
3334
val caseClass = new CaseClass[Typeclass, A](
3435
typeInfo[A],
3536
isObject[A],

modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory
3131

3232
import scala.collection.mutable
3333
import scala.jdk.CollectionConverters._
34+
import scala.annotation.tailrec
3435

3536
/** A cleaner that renders closures serializable if they can be done so safely.
3637
*/
@@ -63,20 +64,27 @@ object ClosureCleaner {
6364
// not a good idea (whereas we can clone closure objects just fine since we
6465
// understand how all their fields are used).
6566
private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = {
66-
for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
67-
f.setAccessible(true)
68-
val outer = f.get(obj)
69-
// The outer pointer may be null if we have cleaned this closure before
70-
if (outer != null) {
71-
if (isClosure(f.getType)) {
72-
val recurRet = getOuterClassesAndObjects(outer)
73-
return (f.getType :: recurRet._1, outer :: recurRet._2)
74-
} else {
75-
return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
76-
}
67+
68+
@tailrec
69+
def loop(fields: List[Field]): (List[Class[_]], List[AnyRef]) =
70+
fields match {
71+
case f :: tail =>
72+
f.setAccessible(true)
73+
val outer = f.get(obj)
74+
// The outer pointer may be null if we have cleaned this closure before
75+
if (outer != null) {
76+
if (isClosure(f.getType)) {
77+
val recurRet = getOuterClassesAndObjects(outer)
78+
(f.getType :: recurRet._1, outer :: recurRet._2)
79+
} else {
80+
(f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
81+
}
82+
} else loop(tail)
83+
case Nil => (Nil, Nil)
7784
}
78-
}
79-
(Nil, Nil)
85+
86+
val fields = obj.getClass.getDeclaredFields.filter(_.getName == "$outer").toList
87+
loop(fields)
8088
}
8189

8290
/** Return a list of classes that represent closures enclosed in the given closure object.
@@ -89,17 +97,20 @@ object ClosureCleaner {
8997
if (cr != null) {
9098
val set = mutable.Set.empty[Class[_]]
9199
cr.accept(new InnerClosureFinder(set), 0)
92-
for (cls <- set -- seen) {
100+
for (cls <- set.toSet -- seen) {
93101
seen += cls
94102
stack.push(cls)
95103
}
96104
}
97105
}
98-
(seen - obj.getClass).toList
106+
(seen.toSet - obj.getClass).toList
99107
}
100108

101109
/** Initializes the accessed fields for outer classes and their super classes. */
102-
private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = {
110+
private def initAccessedFields(
111+
accessedFields: mutable.Map[Class[_], mutable.Set[String]],
112+
outerClasses: Seq[Class[_]]
113+
): Unit = {
103114
for (cls <- outerClasses) {
104115
var currentClass = cls
105116
assert(currentClass != null, "The outer class can't be null.")

modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
189189
* @return
190190
* The resulting data stream.
191191
*/
192-
def flatMap[R: TypeInformation](fun1: IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
192+
def flatMap[R: TypeInformation](fun1: IN1 => IterableOnce[R], fun2: IN2 => IterableOnce[R]): DataStream[R] = {
193193

194194
if (fun1 == null || fun2 == null) {
195195
throw new NullPointerException("FlatMap functions must not be null.")
@@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
198198
val cleanFun2 = clean(fun2)
199199

200200
val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
201-
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) }
202-
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) }
201+
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).iterator.foreach(out.collect) }
202+
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).iterator.foreach(out.collect) }
203203
}
204204

205205
flatMap(flatMapper)

modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,13 +560,13 @@ class DataStream[T](stream: JavaStream[T]) {
560560

561561
/** Creates a new DataStream by applying the given function to every element and flattening the results.
562562
*/
563-
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
563+
def flatMap[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] = {
564564
if (fun == null) {
565565
throw new NullPointerException("FlatMap function must not be null.")
566566
}
567567
val cleanFun = clean(fun)
568568
val flatMapper = new FlatMapFunction[T, R] {
569-
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).foreach(out.collect) }
569+
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).iterator.foreach(out.collect) }
570570
}
571571
flatMap(flatMapper)
572572
}
@@ -690,8 +690,10 @@ class DataStream[T](stream: JavaStream[T]) {
690690
* For cases where the timestamps are not monotonously increasing, use the more general methods
691691
* [[assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)]] and
692692
* [[assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)]].
693+
*
694+
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
693695
*/
694-
@PublicEvolving
696+
@Deprecated
695697
def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
696698
val cleanExtractor = clean(extractor)
697699
val extractorFunction = new AscendingTimestampExtractor[T] {
@@ -754,8 +756,12 @@ class DataStream[T](stream: JavaStream[T]) {
754756
def printToErr(sinkIdentifier: String): DataStreamSink[T] = stream.printToErr(sinkIdentifier)
755757

756758
/** Writes a DataStream using the given [[OutputFormat]].
757-
*/
758-
@PublicEvolving
759+
*
760+
* @deprecated Please use the {@link
761+
* org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly
762+
* using the {@link #addSink(SinkFunction)} method.
763+
*/
764+
@Deprecated
759765
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] = {
760766
stream.writeUsingOutputFormat(format)
761767
}
@@ -790,7 +796,10 @@ class DataStream[T](stream: JavaStream[T]) {
790796

791797
/** Adds the given sink to this DataStream. Only streams with sinks added will be executed once the
792798
* StreamExecutionEnvironment.execute(...) method is called.
799+
*
800+
* @deprecated Please use the sinkTo(sink: org.apache.flink.api.connector.sink2.Sink[T])
793801
*/
802+
@Deprecated
794803
def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T] =
795804
stream.sinkTo(sink)
796805

modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
460460
* Note that the user state object needs to be serializable.
461461
*/
462462
def flatMapWithState[R: TypeInformation, S: TypeInformation](
463-
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
463+
fun: (T, Option[S]) => (IterableOnce[R], Option[S])
464464
): DataStream[R] = {
465465
if (fun == null) {
466466
throw new NullPointerException("Flatmap function must not be null.")
@@ -470,12 +470,12 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
470470
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
471471
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(javaStream.getExecutionConfig)
472472

473-
val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, TraversableOnce[R], S] {
473+
val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, IterableOnce[R], S] {
474474

475475
override val stateSerializer: TypeSerializer[S] = serializer
476476

477477
override def flatMap(in: T, out: Collector[R]): Unit = {
478-
applyWithState(in, cleanFun).foreach(out.collect)
478+
applyWithState(in, cleanFun).iterator.foreach(out.collect)
479479
}
480480
}
481481

@@ -488,8 +488,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
488488
* Name under which to the publish the queryable state instance
489489
* @return
490490
* Queryable state instance
491+
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
492+
* future Flink major version.
491493
*/
492-
@PublicEvolving
494+
@Deprecated
493495
def asQueryableState(queryableStateName: String): QueryableStateStream[K, T] = {
494496
val stateDescriptor = new ValueStateDescriptor(queryableStateName, dataType.createSerializer(executionConfig))
495497

@@ -504,8 +506,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
504506
* State descriptor to create state instance from
505507
* @return
506508
* Queryable state instance
509+
*
510+
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
511+
* future Flink major version.
507512
*/
508-
@PublicEvolving
513+
@Deprecated
509514
def asQueryableState(
510515
queryableStateName: String,
511516
stateDescriptor: ValueStateDescriptor[T]
@@ -529,8 +534,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
529534
* State descriptor to create state instance from
530535
* @return
531536
* Queryable state instance
537+
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
538+
* future Flink major version.
532539
*/
533-
@PublicEvolving
540+
@Deprecated
534541
def asQueryableState(
535542
queryableStateName: String,
536543
stateDescriptor: ReducingStateDescriptor[T]

modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,19 +487,29 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
487487

488488
/** Creates a DataStream that represents the Strings produced by reading the given file line wise. The file will be
489489
* read with the system's default character set.
490+
*
491+
* @deprecated Use {@code
492+
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
490493
*/
494+
@Deprecated
491495
def readTextFile(filePath: String): DataStream[String] =
492496
asScalaStream(javaEnv.readTextFile(filePath))
493497

494498
/** Creates a data stream that represents the Strings produced by reading the given file line wise. The character set
495499
* with the given name will be used to read the files.
500+
* @deprecated Use {@code
501+
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
496502
*/
503+
@Deprecated
497504
def readTextFile(filePath: String, charsetName: String): DataStream[String] =
498505
asScalaStream(javaEnv.readTextFile(filePath, charsetName))
499506

500507
/** Reads the given file with the given input format. The file path should be passed as a URI (e.g.,
501508
* "file:///some/local/file" or "hdfs://host:port/file/path").
509+
* @deprecated Use {@code
510+
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
502511
*/
512+
@Deprecated
503513
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T] =
504514
asScalaStream(javaEnv.readFile(inputFormat, filePath))
505515

@@ -526,8 +536,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
526536
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
527537
* @return
528538
* The data stream that represents the data read from the given file
539+
*
540+
* @deprecated Use {@code
541+
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
529542
*/
530-
@PublicEvolving
543+
@Deprecated
531544
def readFile[T: TypeInformation](
532545
inputFormat: FileInputFormat[T],
533546
filePath: String,
@@ -563,7 +576,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
563576
* have a parallelism of 1. To enable parallel execution, the user defined source should implement
564577
* ParallelSourceFunction or extend RichParallelSourceFunction. In these cases the resulting source will have the
565578
* parallelism of the environment. To change this afterwards call DataStreamSource.setParallelism(int)
579+
*
580+
* @deprecated This method relies on the {@link
581+
* org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be
582+
* removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)}
583+
* method based on the new {@link org.apache.flink.api.connector.source.Source} API instead.
566584
*/
585+
@Deprecated
567586
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
568587
require(function != null, "Function must not be null.")
569588

@@ -573,7 +592,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
573592
}
574593

575594
/** Create a DataStream using a user defined source function for arbitrary source functionality.
576-
*/
595+
*
596+
* @deprecated This method relies on the {@link
597+
* org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be
598+
* removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)}
599+
* method based on the new {@link org.apache.flink.api.connector.source.Source} API instead.
600+
*/
601+
@Deprecated
577602
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
578603
require(function != null, "Function must not be null.")
579604
val sourceFunction = new SourceFunction[T] {

modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ class OnConnectedStream[IN1, IN2](stream: ConnectedStreams[IN1, IN2]) {
6363
*/
6464
@PublicEvolving
6565
def flatMapWith[R: TypeInformation](
66-
flatMap1: IN1 => TraversableOnce[R],
67-
flatMap2: IN2 => TraversableOnce[R]
66+
flatMap1: IN1 => IterableOnce[R],
67+
flatMap2: IN2 => IterableOnce[R]
6868
): DataStream[R] =
6969
stream.flatMap(flatMap1, flatMap2)
7070

0 commit comments

Comments
 (0)