|
24 | 24 | import org.apache.streampark.console.base.exception.ApiDetailException; |
25 | 25 |
|
26 | 26 | import org.apache.commons.io.FileUtils; |
| 27 | +import org.apache.commons.lang3.StringUtils; |
27 | 28 |
|
28 | 29 | import com.baomidou.mybatisplus.annotation.IdType; |
29 | 30 | import com.baomidou.mybatisplus.annotation.TableId; |
@@ -67,33 +68,61 @@ public class FlinkEnv implements Serializable { |
67 | 68 |
|
68 | 69 | private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString(); |
69 | 70 |
|
| 71 | + private static final Float FLINK_CONFIG_CHANGE_VERSION = 1.19f; |
| 72 | + private static final String LEGACY_CONFIG_FILE = "flink-conf.yaml"; |
| 73 | + private static final String NEW_CONFIG_FILE = "config.yaml"; |
| 74 | + private static final String CONF_DIR = "/conf/"; |
| 75 | + |
70 | 76 | public void doSetFlinkConf() throws ApiDetailException { |
71 | | - File yaml; |
72 | | - float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle())); |
73 | | - if (ver < 1.19f) { |
74 | | - yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); |
75 | | - if (!yaml.exists()) { |
76 | | - throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf "); |
| 77 | + Float version = getVersionNumber(); |
| 78 | + File configFile = resolveConfigFile(version); |
| 79 | + |
| 80 | + try { |
| 81 | + String flinkConf = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8); |
| 82 | + this.flinkConf = DeflaterUtils.zipString(flinkConf); |
| 83 | + } catch (Exception e) { |
| 84 | + throw new ApiDetailException( |
| 85 | + "Failed to read Flink configuration file: " + configFile.getAbsolutePath(), e); |
| 86 | + } |
| 87 | + } |
| 88 | + |
| 89 | + private File resolveConfigFile(Float version) throws ApiAlertException { |
| 90 | + String confDir = this.flinkHome + CONF_DIR; |
| 91 | + |
| 92 | + if (version < FLINK_CONFIG_CHANGE_VERSION) { |
| 93 | + // For Flink < 1.19, use flink-conf.yaml |
| 94 | + File configFile = new File(confDir + LEGACY_CONFIG_FILE); |
| 95 | + if (!configFile.exists()) { |
| 96 | + throw new ApiAlertException( |
| 97 | + String.format( |
| 98 | + "Cannot find %s in %s for Flink version %s", |
| 99 | + LEGACY_CONFIG_FILE, confDir, this.version)); |
77 | 100 | } |
78 | | - } else if (ver == 1.19f) { |
79 | | - yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); |
80 | | - if (!yaml.exists()) { |
81 | | - yaml = new File(this.flinkHome.concat("/conf/config.yaml")); |
| 101 | + return configFile; |
| 102 | + } else if (version.equals(FLINK_CONFIG_CHANGE_VERSION)) { |
| 103 | + // For Flink 1.19, try both config files (backward compatibility) |
| 104 | + File legacyConfigFile = new File(confDir + LEGACY_CONFIG_FILE); |
| 105 | + if (legacyConfigFile.exists()) { |
| 106 | + return legacyConfigFile; |
82 | 107 | } |
83 | | - if (!yaml.exists()) { |
84 | | - throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml in flink/conf "); |
| 108 | + File newConfigFile = new File(confDir + NEW_CONFIG_FILE); |
| 109 | + if (newConfigFile.exists()) { |
| 110 | + return newConfigFile; |
85 | 111 | } |
| 112 | + throw new ApiAlertException( |
| 113 | + String.format( |
| 114 | + "Cannot find either %s or %s in %s for Flink version %s", |
| 115 | + LEGACY_CONFIG_FILE, NEW_CONFIG_FILE, confDir, this.version)); |
86 | 116 | } else { |
87 | | - yaml = new File(this.flinkHome.concat("/conf/config.yaml")); |
88 | | - if (!yaml.exists()) { |
89 | | - throw new ApiAlertException("cannot find config.yaml in flink/conf "); |
| 117 | + // For Flink > 1.19, use config.yaml |
| 118 | + File configFile = new File(confDir + NEW_CONFIG_FILE); |
| 119 | + if (!configFile.exists()) { |
| 120 | + throw new ApiAlertException( |
| 121 | + String.format( |
| 122 | + "Cannot find %s in %s for Flink version %s", |
| 123 | + NEW_CONFIG_FILE, confDir, this.version)); |
90 | 124 | } |
91 | | - } |
92 | | - try { |
93 | | - String flinkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8); |
94 | | - this.flinkConf = DeflaterUtils.zipString(flinkConf); |
95 | | - } catch (Exception e) { |
96 | | - throw new ApiDetailException(e); |
| 125 | + return configFile; |
97 | 126 | } |
98 | 127 | } |
99 | 128 |
|
@@ -144,4 +173,17 @@ public String getVersionOfMiddle() { |
144 | 173 | public String getVersionOfLast() { |
145 | 174 | return this.version.split("\\.")[2]; |
146 | 175 | } |
| 176 | + |
| 177 | + @JsonIgnore |
| 178 | + private Float getVersionNumber() { |
| 179 | + if (StringUtils.isNotBlank(this.version)) { |
| 180 | + return Float.parseFloat(getVersionOfFirst() + "." + getVersionOfMiddle()); |
| 181 | + } |
| 182 | + throw new RuntimeException("Flink version is null"); |
| 183 | + } |
| 184 | + |
| 185 | + @JsonIgnore |
| 186 | + public boolean isLegacyFlinkConf() { |
| 187 | + return getVersionNumber() < FLINK_CONFIG_CHANGE_VERSION; |
| 188 | + } |
147 | 189 | } |
0 commit comments