Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ P.S. this flag can be deprecated in future when most of the users migrate to the

### Disable fail-fast on Scala type resolution with Class

From [1.3.0 release](https://github.com/flink-extended/flink-scala-api/releases/tag/v1.19.1_1.3.0), a check is done to prevent misusage of Scala type resolution with `Class` which may lead to silently fallback to generic Kryo serializers.
From [1.2.3 release](https://github.com/flink-extended/flink-scala-api/releases/tag/v1.20.0_1.2.3), a check is done to prevent misusage of Scala type resolution with `Class` which may lead to silently fallback to generic Kryo serializers.

You can disable this check with the `DISABLE_FAIL_FAST_ON_SCALA_TYPE_RESOLUTION_WITH_CLASS` environment variable set to `true`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.flink.api.connector.source.{Source, SourceSplit}
import org.apache.flink.api.java.tuple
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.configuration.{ConfigOptions, Configuration, ReadableConfig}
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.state.StateBackend
Expand All @@ -43,7 +43,10 @@ import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.graph.StreamGraph
import org.apache.flink.util.{SplittableIterator, TernaryBoolean}
import org.apache.flinkx.api.ScalaStreamOps._
import org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory
import org.slf4j.{Logger, LoggerFactory}

import java.lang.reflect.Type
import java.net.URI
import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -801,6 +804,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {

object StreamExecutionEnvironment {

@transient lazy val log: Logger = LoggerFactory.getLogger(classOf[StreamExecutionEnvironment])

/** Sets the default parallelism that will be used for the local execution environment created by
* [[createLocalEnvironment()]].
*
Expand All @@ -827,8 +832,8 @@ object StreamExecutionEnvironment {
* cluster.
*/
def getExecutionEnvironment: StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(new Configuration)
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(extraConfig))
configureExtra()
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
}

/** Creates an execution environment that represents the context in which the program is currently executed.
Expand All @@ -837,8 +842,8 @@ object StreamExecutionEnvironment {
* Pass a custom configuration into the cluster.
*/
def getExecutionEnvironment(configuration: Configuration): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(configuration)
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(extraConfig))
configureExtra()
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(configuration))
}

// --------------------------------------------------------------------------
Expand All @@ -852,8 +857,8 @@ object StreamExecutionEnvironment {
* [[setDefaultLocalParallelism(Int)]].
*/
def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(new Configuration)
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, extraConfig))
configureExtra()
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}

/** Creates a local execution environment. The local execution environment will run the program in a multi-threaded
Expand All @@ -865,8 +870,8 @@ object StreamExecutionEnvironment {
* Pass a custom configuration into the cluster.
*/
def createLocalEnvironment(parallelism: Int, configuration: Configuration): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(configuration)
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, extraConfig))
configureExtra()
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, configuration))
}

/** Creates a [[StreamExecutionEnvironment]] for local program execution that also starts the web monitoring UI.
Expand All @@ -884,8 +889,9 @@ object StreamExecutionEnvironment {
*/
@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(if (config == null) new Configuration() else config)
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(extraConfig))
configureExtra()
val conf: Configuration = if (config == null) new Configuration() else config
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}

// --------------------------------------------------------------------------
Expand All @@ -906,8 +912,8 @@ object StreamExecutionEnvironment {
* user-defined input formats, or any libraries, those must be provided in the JAR files.
*/
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(new Configuration)
new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*))
configureExtra()
new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
}

/** Creates a remote execution environment. The remote environment sends (parts of) the program to a cluster for
Expand All @@ -930,8 +936,8 @@ object StreamExecutionEnvironment {
parallelism: Int,
jarFiles: String*
): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(new Configuration)
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*)
configureExtra()
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
javaEnv.setParallelism(parallelism)
new StreamExecutionEnvironment(javaEnv)
}
Expand All @@ -956,42 +962,28 @@ object StreamExecutionEnvironment {
config: Configuration,
jarFiles: String*
): StreamExecutionEnvironment = {
val extraConfig = copyWithExtra(config)
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*)
configureExtra()
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, config, jarFiles: _*)
new StreamExecutionEnvironment(javaEnv)
}

/** Copy input config and add extra configuration:
/** Add extra configuration:
* - register type info factories to fail-fast on Scala type resolution with Class
* @param config
* input configuration
* @return
* a copy of input config with extra configuration
*/
private def copyWithExtra(config: Configuration): Configuration = {
private def configureExtra(): Unit = {
if (!isFailFastOnScalaTypeResolutionWithClassConfigured && !isFailFastOnScalaTypeResolutionWithClassDisabled) {
isFailFastOnScalaTypeResolutionWithClassConfigured = true
val serializationOption =
ConfigOptions.key("pipeline.serialization-config").stringType().asList().noDefaultValue()
val serializationConfig = config.getOptional(serializationOption).orElse(new util.ArrayList[String])
serializationConfig.add(
"scala.Product: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
)
serializationConfig.add(
"scala.Option: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
)
serializationConfig.add(
"scala.util.Either: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
)
serializationConfig.add(
"scala.Array: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
)
serializationConfig.add(
"scala.collection.Iterable: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
)
new Configuration(config).set(serializationOption, serializationConfig)
} else {
config
try {
val typeExtractorClass = Class.forName("org.apache.flink.api.java.typeutils.TypeExtractor")
val registerFactoryMethod = typeExtractorClass.getMethod("registerFactory", classOf[Type], classOf[Class[_]])
registerFactoryMethod.invoke(null, classOf[Product], classOf[FailFastTypeInfoFactory])
registerFactoryMethod.invoke(null, classOf[Option[_]], classOf[FailFastTypeInfoFactory])
registerFactoryMethod.invoke(null, classOf[Either[_, _]], classOf[FailFastTypeInfoFactory])
registerFactoryMethod.invoke(null, classOf[Iterable[_]], classOf[FailFastTypeInfoFactory])
} catch {
case t: Throwable =>
log.info(s"Unable to activate 'fail-fast on Scala type resolution with Class' feature: available from Flink 1.19: $t")
}
}
}

Expand Down
Loading