Skip to content

Commit 19a061c

Browse files
[Optimize] [dinky-gateway] Add default jobmanager.memory.process.size parameter (#4008)
Co-authored-by: yuhang2.zhang <[email protected]>
1 parent 8cbd5bb commit 19a061c

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

dinky-core/src/main/java/org/dinky/job/JobConfig.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.dinky.gateway.enums.SavePointStrategy;
3030
import org.dinky.gateway.model.FlinkClusterConfig;
3131

32+
import org.apache.commons.lang3.StringUtils;
3233
import org.apache.flink.configuration.Configuration;
3334
import org.apache.flink.configuration.CoreOptions;
3435
import org.apache.flink.configuration.RestOptions;
@@ -42,12 +43,14 @@
4243
import lombok.AllArgsConstructor;
4344
import lombok.Builder;
4445
import lombok.Data;
46+
import lombok.extern.slf4j.Slf4j;
4547

4648
/**
4749
* JobConfig
4850
*
4951
* @since 2021/6/27 18:45
5052
*/
53+
@Slf4j
5154
@Data
5255
@Builder
5356
@AllArgsConstructor
@@ -257,9 +260,18 @@ public void buildGatewayConfig(FlinkClusterConfig config) {
257260
Assert.notNull(customConfig.getValue(), "Custom flink config has null value");
258261
flinkConfig.getConfiguration().put(customConfig.getName(), customConfig.getValue());
259262
}
263+
264+
Map<String, String> configuration = flinkConfig.getConfiguration();
265+
266+
// In Kubernetes mode, must set jobmanager.memory.process.size.
267+
if (StringUtils.isBlank(configuration.get("jobmanager.memory.process.size"))) {
268+
log.warn("In Kubernetes mode, please configure 'jobmanager.memory.process.size', default 2048m");
269+
configuration.put("jobmanager.memory.process.size", "2048m");
270+
}
271+
260272
// Load job configuration content afterwards
261-
flinkConfig.getConfiguration().putAll(getConfigJson());
262-
flinkConfig.getConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism));
273+
configuration.putAll(getConfigJson());
274+
configuration.put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism));
263275
flinkConfig.setJobName(getJobName());
264276

265277
gatewayConfig = GatewayConfig.build(config);

0 commit comments

Comments
 (0)