2424import org .apache .commons .io .Charsets ;
2525import org .apache .commons .lang .StringUtils ;
2626import org .apache .flink .client .program .ClusterClient ;
27- import org .apache .flink .client .program .MiniClusterClient ;
28- import org .apache .flink .configuration .ConfigConstants ;
27+ import org .apache .flink .client .program .rest .RestClusterClient ;
2928import org .apache .flink .configuration .Configuration ;
3029import org .apache .flink .configuration .GlobalConfiguration ;
3130import org .apache .flink .configuration .JobManagerOptions ;
3231import org .apache .flink .core .fs .FileSystem ;
3332import org .apache .flink .runtime .akka .AkkaUtils ;
34- import org .apache .flink .runtime .minicluster .MiniCluster ;
35- import org .apache .flink .runtime .minicluster .MiniClusterConfiguration ;
3633import org .apache .flink .runtime .util .LeaderConnectionInfo ;
3734import org .apache .flink .yarn .AbstractYarnClusterDescriptor ;
3835import org .apache .flink .yarn .YarnClusterDescriptor ;
5754 */
5855public class ClusterClientFactory {
5956
57+ private static final Logger LOG = LoggerFactory .getLogger (ClusterClientFactory .class );
58+
59+ private static final String HA_CLUSTER_ID = "high-availability.cluster-id" ;
60+
61+ private static final String HIGH_AVAILABILITY = "high-availability" ;
62+
63+ private static final String NODE = "NONE" ;
64+
65+ private static final String ZOOKEEPER = "zookeeper" ;
66+
67+ private static final String HADOOP_CONF = "fs.hdfs.hadoopconf" ;
68+
6069 public static ClusterClient createClusterClient (Options launcherOptions ) throws Exception {
6170 String mode = launcherOptions .getMode ();
6271 if (mode .equals (ClusterMode .standalone .name ())) {
@@ -70,10 +79,12 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
7079 public static ClusterClient createStandaloneClient (Options launcherOptions ) throws Exception {
7180 String flinkConfDir = launcherOptions .getFlinkconf ();
7281 Configuration config = GlobalConfiguration .loadConfiguration (flinkConfDir );
73- MiniClusterConfiguration .Builder configBuilder = new MiniClusterConfiguration .Builder ();
74- configBuilder .setConfiguration (config );
75- MiniCluster miniCluster = new MiniCluster (configBuilder .build ());
76- MiniClusterClient clusterClient = new MiniClusterClient (config , miniCluster );
82+
83+ LOG .info ("------------config params-------------------------" );
84+ config .toMap ().forEach ((key , value ) -> LOG .info ("{}: {}" , key , value ));
85+ LOG .info ("-------------------------------------------" );
86+
87+ RestClusterClient clusterClient = new RestClusterClient <>(config , "clusterClient" );
7788 LeaderConnectionInfo connectionInfo = clusterClient .getClusterConnectionInfo ();
7889 InetSocketAddress address = AkkaUtils .getInetSocketAddressFromAkkaURL (connectionInfo .getAddress ());
7990 config .setString (JobManagerOptions .ADDRESS , address .getAddress ().getHostName ());
@@ -89,6 +100,8 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
89100
90101 if (StringUtils .isNotBlank (yarnConfDir )) {
91102 try {
103+ boolean isHighAvailability ;
104+
92105 config .setString (ConfigConstants .PATH_HADOOP_CONFIG , yarnConfDir );
93106 FileSystem .initialize (config );
94107
@@ -101,6 +114,7 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
101114 String yarnSessionConf = launcherOptions .getYarnSessionConf ();
102115 yarnSessionConf = URLDecoder .decode (yarnSessionConf , Charsets .UTF_8 .toString ());
103116 Properties yarnSessionConfProperties = PluginUtil .jsonStrToObject (yarnSessionConf , Properties .class );
117+
104118 Object yid = yarnSessionConfProperties .get ("yid" );
105119
106120 if (null != yid ) {
@@ -109,12 +123,22 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
109123 applicationId = getYarnClusterApplicationId (yarnClient );
110124 }
111125
112- System . out . println ( " applicationId=" + applicationId .toString ());
126+ LOG . info ( "current applicationId = {}" , applicationId .toString ());
113127
114128 if (StringUtils .isEmpty (applicationId .toString ())) {
115129 throw new RuntimeException ("No flink session found on yarn cluster." );
116130 }
117131
132+ isHighAvailability = config .getString (HIGH_AVAILABILITY , NODE ).equals (ZOOKEEPER );
133+
134+ if (isHighAvailability && config .getString (HA_CLUSTER_ID , null ) == null ) {
135+ config .setString (HA_CLUSTER_ID , applicationId .toString ());
136+ }
137+
138+ LOG .info ("------------config params-------------------------" );
139+ config .toMap ().forEach ((key , value ) -> LOG .info ("{}: {}" , key , value ));
140+ LOG .info ("-------------------------------------------" );
141+
118142 AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor (config , yarnConf , flinkConfDir , yarnClient , false );
119143 ClusterClient clusterClient = clusterDescriptor .retrieve (applicationId );
120144 clusterClient .setDetached (true );
@@ -158,15 +182,15 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)
158182
159183 }
160184
161- if (StringUtils .isEmpty (applicationId .toString ())) {
185+ if (applicationId == null || StringUtils .isEmpty (applicationId .toString ())) {
162186 throw new RuntimeException ("No flink session found on yarn cluster." );
163187 }
164188 return applicationId ;
165189 }
166190
167191 private static ApplicationId toApplicationId (String appIdStr ) {
168192 Iterator <String > it = StringHelper ._split (appIdStr ).iterator ();
169- if (!(it .next ()). equals ( "application" )) {
193+ if (!"application" . equals (it .next ())) {
170194 throw new IllegalArgumentException ("Invalid ApplicationId prefix: " + appIdStr + ". The valid ApplicationId should start with prefix " + "application" );
171195 } else {
172196 try {
0 commit comments