@@ -32,7 +32,7 @@ import org.apache.flink.api.connector.source.{Source, SourceSplit}
3232import org .apache .flink .api .java .tuple
3333import org .apache .flink .api .java .typeutils .ResultTypeQueryable
3434import org .apache .flink .api .java .typeutils .runtime .kryo .KryoSerializer
35- import org .apache .flink .configuration .{ConfigOptions , Configuration , ReadableConfig }
35+ import org .apache .flink .configuration .{Configuration , ReadableConfig }
3636import org .apache .flink .core .execution .{JobClient , JobListener }
3737import org .apache .flink .core .fs .Path
3838import org .apache .flink .runtime .state .StateBackend
@@ -43,7 +43,10 @@ import org.apache.flink.streaming.api.functions.source._
4343import org .apache .flink .streaming .api .graph .StreamGraph
4444import org .apache .flink .util .{SplittableIterator , TernaryBoolean }
4545import org .apache .flinkx .api .ScalaStreamOps ._
46+ import org .apache .flinkx .api .typeinfo .FailFastTypeInfoFactory
47+ import org .slf4j .{Logger , LoggerFactory }
4648
49+ import java .lang .reflect .Type
4750import java .net .URI
4851import java .util
4952import scala .jdk .CollectionConverters ._
@@ -801,6 +804,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
801804
802805object StreamExecutionEnvironment {
803806
807+ @ transient lazy val log : Logger = LoggerFactory .getLogger(classOf [StreamExecutionEnvironment ])
808+
804809 /** Sets the default parallelism that will be used for the local execution environment created by
805810 * [[createLocalEnvironment() ]].
806811 *
@@ -827,8 +832,8 @@ object StreamExecutionEnvironment {
827832 * cluster.
828833 */
829834 def getExecutionEnvironment : StreamExecutionEnvironment = {
830- val extraConfig = copyWithExtra( new Configuration )
831- new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(extraConfig) )
835+ configureExtra( )
836+ new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment)
832837 }
833838
834839 /** Creates an execution environment that represents the context in which the program is currently executed.
@@ -837,8 +842,8 @@ object StreamExecutionEnvironment {
837842 * Pass a custom configuration into the cluster.
838843 */
839844 def getExecutionEnvironment (configuration : Configuration ): StreamExecutionEnvironment = {
840- val extraConfig = copyWithExtra(configuration )
841- new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(extraConfig ))
845+ configureExtra( )
846+ new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(configuration ))
842847 }
843848
844849 // --------------------------------------------------------------------------
@@ -852,8 +857,8 @@ object StreamExecutionEnvironment {
852857 * [[setDefaultLocalParallelism(Int) ]].
853858 */
854859 def createLocalEnvironment (parallelism : Int = JavaEnv .getDefaultLocalParallelism): StreamExecutionEnvironment = {
855- val extraConfig = copyWithExtra( new Configuration )
856- new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, extraConfig ))
860+ configureExtra( )
861+ new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism))
857862 }
858863
859864 /** Creates a local execution environment. The local execution environment will run the program in a multi-threaded
@@ -865,8 +870,8 @@ object StreamExecutionEnvironment {
865870 * Pass a custom configuration into the cluster.
866871 */
867872 def createLocalEnvironment (parallelism : Int , configuration : Configuration ): StreamExecutionEnvironment = {
868- val extraConfig = copyWithExtra(configuration )
869- new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, extraConfig ))
873+ configureExtra( )
874+ new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, configuration ))
870875 }
871876
872877 /** Creates a [[StreamExecutionEnvironment ]] for local program execution that also starts the web monitoring UI.
@@ -884,8 +889,9 @@ object StreamExecutionEnvironment {
884889 */
885890 @ PublicEvolving
886891 def createLocalEnvironmentWithWebUI (config : Configuration = null ): StreamExecutionEnvironment = {
887- val extraConfig = copyWithExtra(if (config == null ) new Configuration () else config)
888- new StreamExecutionEnvironment (JavaEnv .createLocalEnvironmentWithWebUI(extraConfig))
892+ configureExtra()
893+ val conf : Configuration = if (config == null ) new Configuration () else config
894+ new StreamExecutionEnvironment (JavaEnv .createLocalEnvironmentWithWebUI(conf))
889895 }
890896
891897 // --------------------------------------------------------------------------
@@ -906,8 +912,8 @@ object StreamExecutionEnvironment {
906912 * user-defined input formats, or any libraries, those must be provided in the JAR files.
907913 */
908914 def createRemoteEnvironment (host : String , port : Int , jarFiles : String * ): StreamExecutionEnvironment = {
909- val extraConfig = copyWithExtra( new Configuration )
910- new StreamExecutionEnvironment (JavaEnv .createRemoteEnvironment(host, port, extraConfig, jarFiles : _* ))
915+ configureExtra( )
916+ new StreamExecutionEnvironment (JavaEnv .createRemoteEnvironment(host, port, jarFiles : _* ))
911917 }
912918
913919 /** Creates a remote execution environment. The remote environment sends (parts of) the program to a cluster for
@@ -930,8 +936,8 @@ object StreamExecutionEnvironment {
930936 parallelism : Int ,
931937 jarFiles : String *
932938 ): StreamExecutionEnvironment = {
933- val extraConfig = copyWithExtra( new Configuration )
934- val javaEnv = JavaEnv .createRemoteEnvironment(host, port, extraConfig , jarFiles : _* )
939+ configureExtra( )
940+ val javaEnv = JavaEnv .createRemoteEnvironment(host, port, jarFiles : _* )
935941 javaEnv.setParallelism(parallelism)
936942 new StreamExecutionEnvironment (javaEnv)
937943 }
@@ -956,42 +962,28 @@ object StreamExecutionEnvironment {
956962 config : Configuration ,
957963 jarFiles : String *
958964 ): StreamExecutionEnvironment = {
959- val extraConfig = copyWithExtra(config )
960- val javaEnv = JavaEnv .createRemoteEnvironment(host, port, extraConfig , jarFiles : _* )
965+ configureExtra( )
966+ val javaEnv = JavaEnv .createRemoteEnvironment(host, port, config , jarFiles : _* )
961967 new StreamExecutionEnvironment (javaEnv)
962968 }
963969
964- /** Copy input config and add extra configuration:
970+ /** Add extra configuration:
965971 * - register type info factories to fail-fast on Scala type resolution with Class
966- * @param config
967- * input configuration
968- * @return
969- * a copy of input config with extra configuration
970972 */
971- private def copyWithExtra ( config : Configuration ): Configuration = {
973+ private def configureExtra ( ): Unit = {
972974 if (! isFailFastOnScalaTypeResolutionWithClassConfigured && ! isFailFastOnScalaTypeResolutionWithClassDisabled) {
973975 isFailFastOnScalaTypeResolutionWithClassConfigured = true
974- val serializationOption =
975- ConfigOptions .key(" pipeline.serialization-config" ).stringType().asList().noDefaultValue()
976- val serializationConfig = config.getOptional(serializationOption).orElse(new util.ArrayList [String ])
977- serializationConfig.add(
978- " scala.Product: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
979- )
980- serializationConfig.add(
981- " scala.Option: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
982- )
983- serializationConfig.add(
984- " scala.util.Either: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
985- )
986- serializationConfig.add(
987- " scala.Array: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
988- )
989- serializationConfig.add(
990- " scala.collection.Iterable: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
991- )
992- new Configuration (config).set(serializationOption, serializationConfig)
993- } else {
994- config
976+ try {
977+ val typeExtractorClass = Class .forName(" org.apache.flink.api.java.typeutils.TypeExtractor" )
978+ val registerFactoryMethod = typeExtractorClass.getMethod(" registerFactory" , classOf [Type ], classOf [Class [_]])
979+ registerFactoryMethod.invoke(null , classOf [Product ], classOf [FailFastTypeInfoFactory ])
980+ registerFactoryMethod.invoke(null , classOf [Option [_]], classOf [FailFastTypeInfoFactory ])
981+ registerFactoryMethod.invoke(null , classOf [Either [_, _]], classOf [FailFastTypeInfoFactory ])
982+ registerFactoryMethod.invoke(null , classOf [Iterable [_]], classOf [FailFastTypeInfoFactory ])
983+ } catch {
984+ case t : Throwable =>
985+ log.info(s " Unable to activate 'fail-fast on Scala type resolution with Class' feature: available from Flink 1.19: $t" )
986+ }
995987 }
996988 }
997989
0 commit comments