Skip to content

Commit 5dd9508

Browse files
feat: fail-fast on Scala type resolution with Class
1 parent 1ba841b commit 5dd9508

File tree

4 files changed

+98
-7
lines changed

4 files changed

+98
-7
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ However, this project cannot enforce TypeInformation usage in the Flink Java API
187187

188188
Usage of this code may lead to silently fallback to Kryo.
189189

190+
From Flink 1.19, a check is done to detect this misusage. To disable it, see [Disable fail-fast on Scala type resolution with Class feature flag](#disable-fail-fast-on-scala-type-resolution-with-class).
191+
190192
> [!WARNING]
191193
> Official `flink-scala` deprecated dependency contains Scala-specialized Kryo serializers. If this dependency is removed from the classpath (see [Supported Flink versions](#supported-flink-versions)), usage of Kryo with Scala classes leads to erroneous re-instantiations of `object` and `case object` singletons.
192194
>
@@ -402,7 +404,7 @@ env
402404

403405
In the [1.1.5 release](https://github.com/flink-extended/flink-scala-api/releases/tag/v1.18.1_1.1.5) the Case Class serialization process also [stores case class arity](https://github.com/flink-extended/flink-scala-api/pull/98/files#diff-e896c210d6a754cb3afb462aea34cca08f090330f6f3c663a64dfb5584fc3727R106) number to
404406
a savepoint. This was introduced to support Case Class schema evolution and allow to add new
405-
class fields with default values. However, unfortunatelly this is the breaking change to the Flink job state restore process. Flink job will fail, if
407+
class fields with default values. However, unfortunately this is the breaking change to the Flink job state restore process. Flink job will fail, if
406408
a savepoint used for the job restore was created by 1.1.4 or earlier releases.
407409

408410
In order migrate to the 1.1.5 release version, one can use specially added environment variable:
@@ -412,12 +414,18 @@ To disable new savepoint format and be able to restore a Flink job with a savepo
412414

413415
Example: `DISABLE_CASE_CLASS_ARITY_USAGE = true`
414416

415-
To enbale new serialization logic set this variable to `false` or simply do not define this envrionment vairable.
417+
To enable new serialization logic set this variable to `false` or simply do not define this environment variable.
416418

417419
Example: `DISABLE_CASE_CLASS_ARITY_USAGE = false`
418420

419421
P.S. this flag can be deprecated in future when most of the users migrate to the latest library version.
420422

423+
### Disable fail-fast on Scala type resolution with Class
424+
425+
From [1.3.0 release](https://github.com/flink-extended/flink-scala-api/releases/tag/v1.19.1_1.3.0), a check is done to prevent misusage of Scala type resolution with `Class` which may lead to silently fallback to generic Kryo serializers.
426+
427+
You can disable this check with the `DISABLE_FAIL_FAST_ON_SCALA_TYPE_RESOLUTION_WITH_CLASS` environment variable set to `true`.
428+
421429
## Release
422430

423431
Create SBT file at ~/.sbt/1.0/sonatype.sbt with the following content:

modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.flink.api.connector.source.{Source, SourceSplit}
3232
import org.apache.flink.api.java.tuple
3333
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
3434
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
35-
import org.apache.flink.configuration.{Configuration, ReadableConfig}
35+
import org.apache.flink.configuration.{ConfigOptions, Configuration, ReadableConfig}
3636
import org.apache.flink.core.execution.{JobClient, JobListener}
3737
import org.apache.flink.core.fs.Path
3838
import org.apache.flink.runtime.state.StateBackend
@@ -48,6 +48,7 @@ import java.net.URI
4848
import java.util
4949
import scala.jdk.CollectionConverters._
5050
import scala.language.implicitConversions
51+
import scala.util.Try
5152

5253
@Public
5354
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
@@ -826,7 +827,9 @@ object StreamExecutionEnvironment {
826827
* cluster.
827828
*/
828829
def getExecutionEnvironment: StreamExecutionEnvironment = {
829-
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
830+
val configuration = new Configuration
831+
configureFailFastOnScalaTypeResolutionWithClass(configuration)
832+
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(configuration))
830833
}
831834

832835
/** Creates an execution environment that represents the context in which the program is currently executed.
@@ -835,6 +838,7 @@ object StreamExecutionEnvironment {
835838
* Pass a custom configuration into the cluster.
836839
*/
837840
def getExecutionEnvironment(configuration: Configuration): StreamExecutionEnvironment = {
841+
configureFailFastOnScalaTypeResolutionWithClass(configuration)
838842
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(configuration))
839843
}
840844

@@ -849,7 +853,9 @@ object StreamExecutionEnvironment {
849853
* [[setDefaultLocalParallelism(Int)]].
850854
*/
851855
def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism): StreamExecutionEnvironment = {
852-
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
856+
val configuration = new Configuration
857+
configureFailFastOnScalaTypeResolutionWithClass(configuration)
858+
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, configuration))
853859
}
854860

855861
/** Creates a local execution environment. The local execution environment will run the program in a multi-threaded
@@ -861,6 +867,7 @@ object StreamExecutionEnvironment {
861867
* Pass a custom configuration into the cluster.
862868
*/
863869
def createLocalEnvironment(parallelism: Int, configuration: Configuration): StreamExecutionEnvironment = {
870+
configureFailFastOnScalaTypeResolutionWithClass(configuration)
864871
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, configuration))
865872
}
866873

@@ -880,6 +887,7 @@ object StreamExecutionEnvironment {
880887
@PublicEvolving
881888
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
882889
val conf: Configuration = if (config == null) new Configuration() else config
890+
configureFailFastOnScalaTypeResolutionWithClass(conf)
883891
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
884892
}
885893

@@ -901,7 +909,9 @@ object StreamExecutionEnvironment {
901909
* user-defined input formats, or any libraries, those must be provided in the JAR files.
902910
*/
903911
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment = {
904-
new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
912+
val configuration = new Configuration
913+
configureFailFastOnScalaTypeResolutionWithClass(configuration)
914+
new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, configuration, jarFiles: _*))
905915
}
906916

907917
/** Creates a remote execution environment. The remote environment sends (parts of) the program to a cluster for
@@ -924,7 +934,9 @@ object StreamExecutionEnvironment {
924934
parallelism: Int,
925935
jarFiles: String*
926936
): StreamExecutionEnvironment = {
927-
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
937+
val configuration = new Configuration
938+
configureFailFastOnScalaTypeResolutionWithClass(configuration)
939+
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, configuration, jarFiles: _*)
928940
javaEnv.setParallelism(parallelism)
929941
new StreamExecutionEnvironment(javaEnv)
930942
}
@@ -949,7 +961,27 @@ object StreamExecutionEnvironment {
949961
config: Configuration,
950962
jarFiles: String*
951963
): StreamExecutionEnvironment = {
964+
configureFailFastOnScalaTypeResolutionWithClass(config)
952965
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, config, jarFiles: _*)
953966
new StreamExecutionEnvironment(javaEnv)
954967
}
968+
969+
private def configureFailFastOnScalaTypeResolutionWithClass(configuration: Configuration): Unit = {
970+
if (!isFailFastOnScalaTypeResolutionWithClassDisabled) {
971+
val serializationOption = ConfigOptions.key("pipeline.serialization-config").stringType().asList().noDefaultValue()
972+
val serializationConfig = configuration.getOptional(serializationOption).orElse(new util.ArrayList[String])
973+
serializationConfig.add("scala.Product: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
974+
serializationConfig.add("scala.Option: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
975+
serializationConfig.add("scala.util.Either: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
976+
serializationConfig.add("scala.Array: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
977+
serializationConfig.add("scala.collection.Iterable: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
978+
configuration.set(serializationOption, serializationConfig)
979+
}
980+
}
981+
982+
private lazy val isFailFastOnScalaTypeResolutionWithClassDisabled: Boolean =
983+
sys.env
984+
.get("DISABLE_FAIL_FAST_ON_SCALA_TYPE_RESOLUTION_WITH_CLASS")
985+
.exists(v => Try(v.toBoolean).getOrElse(false))
986+
955987
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.apache.flinkx.api.typeinfo
2+
3+
import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation}
4+
import org.apache.flink.util.FlinkRuntimeException
5+
import org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory.formatType
6+
7+
import java.lang.reflect.Type
8+
import java.util
9+
import scala.jdk.CollectionConverters._
10+
11+
class FailFastTypeInfoFactory extends TypeInfoFactory[Nothing] {
12+
13+
override def createTypeInfo(t: Type, params: util.Map[String, TypeInformation[_]]): TypeInformation[Nothing] =
14+
throw new FlinkRuntimeException(
15+
s"""You are using a 'Class' to resolve '${formatType(t, params)}' Scala type. flink-scala-api has no control over this kind of type resolution which may lead to silently fallback to generic Kryo serializers.
16+
|Use type information instead: import 'org.apache.flinkx.api.serializers._' to make implicitly available in the scope required 'TypeInformation' to resolve Scala types.
17+
|To disable this check, set 'DISABLE_FAIL_FAST_ON_SCALA_TYPE_RESOLUTION_WITH_CLASS' environment variable to 'true'.""".stripMargin
18+
)
19+
20+
}
21+
22+
object FailFastTypeInfoFactory {
23+
24+
private def formatType(t: Type, params: util.Map[String, TypeInformation[_]]): String = if (params.isEmpty) {
25+
t.getTypeName
26+
} else {
27+
params.keySet().asScala.mkString(s"${t.getTypeName}[", ", ", "]")
28+
}
29+
30+
}

modules/scala-api/src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
55
import org.apache.flink.api.connector.source.Boundedness
66
import org.apache.flink.api.connector.source.mocks.MockSource
77
import org.apache.flink.api.java.typeutils.GenericTypeInfo
8+
import org.apache.flink.util.FlinkRuntimeException
89
import org.scalatest.flatspec.AnyFlatSpec
910
import org.scalatest.matchers.should.Matchers
1011

12+
import scala.util.{Failure, Success, Try}
13+
1114
class StreamExecutionEnvironmentTest extends AnyFlatSpec with Matchers with IntegrationTest {
1215

1316
it should "create a stream from a source" in {
@@ -31,6 +34,24 @@ class StreamExecutionEnvironmentTest extends AnyFlatSpec with Matchers with Inte
3134
stream.dataType shouldBe typeInfo
3235
}
3336

37+
"From Flink 1.19, TypeInformation.of(Class)" should "fail-fast trying to resolve Scala type" in {
38+
Try(Class.forName("org.apache.flink.configuration.PipelineOptions").getField("SERIALIZATION_CONFIG")) match {
39+
case Failure(_) => // Before Flink 1.19: no fail-fast, exception happens at execution
40+
implicit val typeInfo: TypeInformation[Option[Int]] = TypeInformation.of(classOf[Option[Int]])
41+
val stream = env.fromElements(Some(1), None, Some(100))
42+
val exception = intercept[UnsupportedOperationException] {
43+
stream.executeAndCollect(3)
44+
}
45+
exception.getMessage should startWith("Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type.")
46+
47+
case Success(_) => // From Flink 1.19: fail-fast at Scala type resolution
48+
val exception = intercept[FlinkRuntimeException] {
49+
TypeInformation.of(classOf[Option[Int]])
50+
}
51+
exception.getMessage should startWith("You are using a 'Class' to resolve 'scala.Option' Scala type.")
52+
}
53+
}
54+
3455
// --------------------------------------------------------------------------
3556
// mocks
3657
// --------------------------------------------------------------------------

0 commit comments

Comments
 (0)