@@ -827,9 +827,8 @@ object StreamExecutionEnvironment {
827827 * cluster.
828828 */
829829 def getExecutionEnvironment : StreamExecutionEnvironment = {
830- val configuration = new Configuration
831- configureFailFastOnScalaTypeResolutionWithClass(configuration)
832- new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(configuration))
830+ val extraConfig = copyWithExtra(new Configuration )
831+ new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(extraConfig))
833832 }
834833
835834 /** Creates an execution environment that represents the context in which the program is currently executed.
@@ -838,8 +837,8 @@ object StreamExecutionEnvironment {
838837 * Pass a custom configuration into the cluster.
839838 */
840839 def getExecutionEnvironment (configuration : Configuration ): StreamExecutionEnvironment = {
841- configureFailFastOnScalaTypeResolutionWithClass (configuration)
842- new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(configuration ))
840+ val extraConfig = copyWithExtra (configuration)
841+ new StreamExecutionEnvironment (JavaEnv .getExecutionEnvironment(extraConfig ))
843842 }
844843
845844 // --------------------------------------------------------------------------
@@ -853,9 +852,8 @@ object StreamExecutionEnvironment {
853852 * [[setDefaultLocalParallelism(Int) ]].
854853 */
855854 def createLocalEnvironment (parallelism : Int = JavaEnv .getDefaultLocalParallelism): StreamExecutionEnvironment = {
856- val configuration = new Configuration
857- configureFailFastOnScalaTypeResolutionWithClass(configuration)
858- new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, configuration))
855+ val extraConfig = copyWithExtra(new Configuration )
856+ new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, extraConfig))
859857 }
860858
861859 /** Creates a local execution environment. The local execution environment will run the program in a multi-threaded
@@ -867,8 +865,8 @@ object StreamExecutionEnvironment {
867865 * Pass a custom configuration into the cluster.
868866 */
869867 def createLocalEnvironment (parallelism : Int , configuration : Configuration ): StreamExecutionEnvironment = {
870- configureFailFastOnScalaTypeResolutionWithClass (configuration)
871- new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, configuration ))
868+ val extraConfig = copyWithExtra (configuration)
869+ new StreamExecutionEnvironment (JavaEnv .createLocalEnvironment(parallelism, extraConfig ))
872870 }
873871
874872 /** Creates a [[StreamExecutionEnvironment ]] for local program execution that also starts the web monitoring UI.
@@ -886,9 +884,8 @@ object StreamExecutionEnvironment {
886884 */
887885 @ PublicEvolving
888886 def createLocalEnvironmentWithWebUI (config : Configuration = null ): StreamExecutionEnvironment = {
889- val conf : Configuration = if (config == null ) new Configuration () else config
890- configureFailFastOnScalaTypeResolutionWithClass(conf)
891- new StreamExecutionEnvironment (JavaEnv .createLocalEnvironmentWithWebUI(conf))
887+ val extraConfig = copyWithExtra(if (config == null ) new Configuration () else config)
888+ new StreamExecutionEnvironment (JavaEnv .createLocalEnvironmentWithWebUI(extraConfig))
892889 }
893890
894891 // --------------------------------------------------------------------------
@@ -909,9 +906,8 @@ object StreamExecutionEnvironment {
909906 * user-defined input formats, or any libraries, those must be provided in the JAR files.
910907 */
911908 def createRemoteEnvironment (host : String , port : Int , jarFiles : String * ): StreamExecutionEnvironment = {
912- val configuration = new Configuration
913- configureFailFastOnScalaTypeResolutionWithClass(configuration)
914- new StreamExecutionEnvironment (JavaEnv .createRemoteEnvironment(host, port, configuration, jarFiles : _* ))
909+ val extraConfig = copyWithExtra(new Configuration )
910+ new StreamExecutionEnvironment (JavaEnv .createRemoteEnvironment(host, port, extraConfig, jarFiles : _* ))
915911 }
916912
917913 /** Creates a remote execution environment. The remote environment sends (parts of) the program to a cluster for
@@ -934,9 +930,8 @@ object StreamExecutionEnvironment {
934930 parallelism : Int ,
935931 jarFiles : String *
936932 ): StreamExecutionEnvironment = {
937- val configuration = new Configuration
938- configureFailFastOnScalaTypeResolutionWithClass(configuration)
939- val javaEnv = JavaEnv .createRemoteEnvironment(host, port, configuration, jarFiles : _* )
933+ val extraConfig = copyWithExtra(new Configuration )
934+ val javaEnv = JavaEnv .createRemoteEnvironment(host, port, extraConfig, jarFiles : _* )
940935 javaEnv.setParallelism(parallelism)
941936 new StreamExecutionEnvironment (javaEnv)
942937 }
@@ -949,7 +944,7 @@ object StreamExecutionEnvironment {
949944 * The host name or address of the master (JobManager), where the program should be executed.
950945 * @param port
951946 * The port of the master (JobManager), where the program should be executed.
952- * @param configuration
947+ * @param config
953948 * Pass a custom configuration into the cluster.
954949 * @param jarFiles
955950 * The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions,
@@ -961,22 +956,30 @@ object StreamExecutionEnvironment {
961956 config : Configuration ,
962957 jarFiles : String *
963958 ): StreamExecutionEnvironment = {
964- configureFailFastOnScalaTypeResolutionWithClass (config)
965- val javaEnv = JavaEnv .createRemoteEnvironment(host, port, config , jarFiles : _* )
959+ val extraConfig = copyWithExtra (config)
960+ val javaEnv = JavaEnv .createRemoteEnvironment(host, port, extraConfig , jarFiles : _* )
966961 new StreamExecutionEnvironment (javaEnv)
967962 }
968963
969- private def configureFailFastOnScalaTypeResolutionWithClass (configuration : Configuration ): Unit = {
964+ /**
965+ * Copy input config and add extra configuration:
966+ * - register type info factories to fail-fast on Scala type resolution with Class
967+ * @param config input configuration
968+ * @return a copy of input config with extra configuration
969+ */
970+ private def copyWithExtra (config : Configuration ): Configuration = {
970971 if (! isFailFastOnScalaTypeResolutionWithClassConfigured && ! isFailFastOnScalaTypeResolutionWithClassDisabled) {
971972 isFailFastOnScalaTypeResolutionWithClassConfigured = true
972973 val serializationOption = ConfigOptions .key(" pipeline.serialization-config" ).stringType().asList().noDefaultValue()
973- val serializationConfig = configuration .getOptional(serializationOption).orElse(new util.ArrayList [String ])
974+ val serializationConfig = config .getOptional(serializationOption).orElse(new util.ArrayList [String ])
974975 serializationConfig.add(" scala.Product: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}" )
975976 serializationConfig.add(" scala.Option: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}" )
976977 serializationConfig.add(" scala.util.Either: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}" )
977978 serializationConfig.add(" scala.Array: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}" )
978979 serializationConfig.add(" scala.collection.Iterable: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}" )
979- configuration.set(serializationOption, serializationConfig)
980+ config.clone().set(serializationOption, serializationConfig)
981+ } else {
982+ config
980983 }
981984 }
982985
0 commit comments