Skip to content

Commit 1d3e228

Browse files
authored
Merge pull request #48 from zhp8341/bug_fix_optimize
新增任务修改历史版本查询
2 parents 27ed006 + 19e7b21 commit 1d3e228

File tree

15 files changed

+866
-12
lines changed

15 files changed

+866
-12
lines changed

docs/sql/flink_web.sql

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,31 @@ ALTER TABLE job_config add `custom_args` varchar(128) DEFAULT NULL COMMENT '启
7373
ALTER TABLE job_config add `custom_main_class` varchar(128) DEFAULT NULL COMMENT '程序入口类' AFTER custom_args;
7474
ALTER TABLE job_config add `custom_jar_url` varchar(128) DEFAULT NULL COMMENT'自定义jar的http地址 如:http://ccblog.cn/xx.jar' AFTER custom_main_class;
7575

76+
77+
-- ----------------------------
78+
-- Table structure for job_config_history
79+
-- ----------------------------
80+
CREATE TABLE `job_config_history` (
81+
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
82+
`job_config_id` bigint(11) NOT NULL COMMENT 'job_config主表Id',
83+
`job_name` varchar(64) NOT NULL COMMENT '任务名称',
84+
`deploy_mode` varchar(64) NOT NULL COMMENT '提交模式: standalone 、yarn 、yarn-session ',
85+
`flink_run_config` varchar(512) NOT NULL COMMENT 'flink运行配置',
86+
`flink_sql` mediumtext NOT NULL COMMENT 'sql语句',
87+
`flink_checkpoint_config` varchar(512) DEFAULT NULL COMMENT 'checkPoint配置',
88+
`ext_jar_path` varchar(2048) DEFAULT NULL COMMENT 'udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar',
89+
`version` int(11) NOT NULL DEFAULT '0' COMMENT '更新版本号',
90+
`is_deleted` tinyint(1) NOT NULL DEFAULT '0',
91+
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
92+
`edit_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
93+
`creator` varchar(32) DEFAULT 'sys',
94+
`editor` varchar(32) DEFAULT 'sys',
95+
PRIMARY KEY (`id`),
96+
KEY `index_job_config_id` (`job_config_id`) USING BTREE
97+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='flink任务配置历史变更表';
98+
99+
100+
76101
-- ----------------------------
77102
-- Table structure for job_run_log
78103
-- ----------------------------
@@ -98,6 +123,7 @@ CREATE TABLE `job_run_log` (
98123

99124
ALTER TABLE job_run_log add `run_ip` varchar(64) DEFAULT NULL COMMENT '任务运行所在的机器' AFTER local_log ;
100125

126+
101127
-- ----------------------------
102128
-- Table structure for savepoint_backup
103129
-- ----------------------------
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package com.flink.streaming.web.model.dto;
2+
3+
import cn.hutool.core.collection.CollectionUtil;
4+
import com.flink.streaming.web.model.entity.JobConfig;
5+
import com.flink.streaming.web.model.entity.JobConfigHistory;
6+
import lombok.Data;
7+
import org.apache.commons.compress.utils.Lists;
8+
9+
import java.io.Serializable;
10+
import java.util.Collections;
11+
import java.util.Date;
12+
import java.util.List;
13+
14+
/**
15+
* @author zhuhuipei
16+
* @date 2021/5/5
17+
* @time 19:49
18+
*/
19+
@Data
20+
public class JobConfigHistoryDTO implements Serializable {
21+
22+
private static final long serialVersionUID = 1L;
23+
24+
private Long id;
25+
26+
/**
27+
* job_config主表Id
28+
*/
29+
private Long jobConfigId;
30+
31+
/**
32+
* 任务名称
33+
*/
34+
private String jobName;
35+
36+
/**
37+
* 提交模式: standalone 、yarn 、yarn-session
38+
*/
39+
private String deployMode;
40+
41+
/**
42+
* flink运行配置
43+
*/
44+
private String flinkRunConfig;
45+
46+
/**
47+
* checkPoint配置
48+
*/
49+
private String flinkCheckpointConfig;
50+
51+
/**
52+
* udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar
53+
*/
54+
private String extJarPath;
55+
56+
/**
57+
* 更新版本号
58+
*/
59+
private Integer version;
60+
61+
/**
62+
* 创建时间
63+
*/
64+
private Date createTime;
65+
66+
/**
67+
* 修改时间
68+
*/
69+
private Date editTime;
70+
71+
private String creator;
72+
73+
private String editor;
74+
75+
/**
76+
* sql语句
77+
*/
78+
private String flinkSql;
79+
80+
81+
public static JobConfigHistory toEntity(JobConfigHistoryDTO jobConfigHistoryDTO) {
82+
if (jobConfigHistoryDTO == null) {
83+
return null;
84+
}
85+
JobConfigHistory jobConfigHistory = new JobConfigHistory();
86+
jobConfigHistory.setId(jobConfigHistoryDTO.getId());
87+
jobConfigHistory.setJobConfigId(jobConfigHistoryDTO.getJobConfigId());
88+
jobConfigHistory.setJobName(jobConfigHistoryDTO.getJobName());
89+
jobConfigHistory.setDeployMode(jobConfigHistoryDTO.getDeployMode());
90+
jobConfigHistory.setFlinkRunConfig(jobConfigHistoryDTO.getFlinkRunConfig());
91+
jobConfigHistory.setFlinkCheckpointConfig(jobConfigHistoryDTO.getFlinkCheckpointConfig());
92+
jobConfigHistory.setExtJarPath(jobConfigHistoryDTO.getExtJarPath());
93+
jobConfigHistory.setVersion(jobConfigHistoryDTO.getVersion());
94+
jobConfigHistory.setCreateTime(jobConfigHistoryDTO.getCreateTime());
95+
jobConfigHistory.setEditTime(jobConfigHistoryDTO.getEditTime());
96+
jobConfigHistory.setCreator(jobConfigHistoryDTO.getCreator());
97+
jobConfigHistory.setEditor(jobConfigHistoryDTO.getEditor());
98+
jobConfigHistory.setFlinkSql(jobConfigHistoryDTO.getFlinkSql());
99+
return jobConfigHistory;
100+
}
101+
102+
103+
public static JobConfigHistoryDTO toDTO(JobConfigHistory jobConfigHistory) {
104+
if (jobConfigHistory == null) {
105+
return null;
106+
}
107+
JobConfigHistoryDTO jobConfigHistoryDTO = new JobConfigHistoryDTO();
108+
jobConfigHistoryDTO.setId(jobConfigHistory.getId());
109+
jobConfigHistoryDTO.setJobConfigId(jobConfigHistory.getJobConfigId());
110+
jobConfigHistoryDTO.setJobName(jobConfigHistory.getJobName());
111+
jobConfigHistoryDTO.setDeployMode(jobConfigHistory.getDeployMode());
112+
jobConfigHistoryDTO.setFlinkRunConfig(jobConfigHistory.getFlinkRunConfig());
113+
jobConfigHistoryDTO.setFlinkCheckpointConfig(jobConfigHistory.getFlinkCheckpointConfig());
114+
jobConfigHistoryDTO.setExtJarPath(jobConfigHistory.getExtJarPath());
115+
jobConfigHistoryDTO.setVersion(jobConfigHistory.getVersion());
116+
jobConfigHistoryDTO.setCreateTime(jobConfigHistory.getCreateTime());
117+
jobConfigHistoryDTO.setEditTime(jobConfigHistory.getEditTime());
118+
jobConfigHistoryDTO.setCreator(jobConfigHistory.getCreator());
119+
jobConfigHistoryDTO.setEditor(jobConfigHistory.getEditor());
120+
jobConfigHistoryDTO.setFlinkSql(jobConfigHistory.getFlinkSql());
121+
return jobConfigHistoryDTO;
122+
}
123+
124+
public static List<JobConfigHistoryDTO> toListDTO(List<JobConfigHistory> jobConfigHistoryList) {
125+
if (CollectionUtil.isEmpty(jobConfigHistoryList)) {
126+
return Collections.EMPTY_LIST;
127+
}
128+
129+
List<JobConfigHistoryDTO> list = Lists.newArrayList();
130+
131+
for (JobConfigHistory jobConfigHistory : jobConfigHistoryList) {
132+
133+
JobConfigHistoryDTO jobConfigHistoryDTO = JobConfigHistoryDTO.toDTO(jobConfigHistory);
134+
if (jobConfigHistoryDTO != null) {
135+
list.add(jobConfigHistoryDTO);
136+
}
137+
}
138+
139+
return list;
140+
}
141+
142+
143+
public static JobConfigHistoryDTO to(JobConfig jobConfig) {
144+
if (jobConfig == null) {
145+
return null;
146+
}
147+
JobConfigHistoryDTO jobConfigHistoryDTO = new JobConfigHistoryDTO();
148+
jobConfigHistoryDTO.setJobConfigId (jobConfig.getId());
149+
jobConfigHistoryDTO.setJobName(jobConfig.getJobName());
150+
jobConfigHistoryDTO.setDeployMode(jobConfig.getDeployMode());
151+
jobConfigHistoryDTO.setFlinkRunConfig(jobConfig.getFlinkRunConfig());
152+
jobConfigHistoryDTO.setFlinkCheckpointConfig(jobConfig.getFlinkCheckpointConfig());
153+
jobConfigHistoryDTO.setExtJarPath(jobConfig.getExtJarPath());
154+
jobConfigHistoryDTO.setVersion(jobConfig.getVersion());
155+
jobConfigHistoryDTO.setCreateTime(jobConfig.getCreateTime());
156+
jobConfigHistoryDTO.setEditTime(jobConfig.getEditTime());
157+
jobConfigHistoryDTO.setCreator(jobConfig.getCreator());
158+
jobConfigHistoryDTO.setEditor(jobConfig.getEditor());
159+
jobConfigHistoryDTO.setFlinkSql(jobConfig.getFlinkSql());
160+
return jobConfigHistoryDTO;
161+
}
162+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.flink.streaming.web.model.entity;
2+
3+
import lombok.Data;
4+
5+
import java.io.Serializable;
6+
import java.util.Date;
7+
8+
/**
9+
*
10+
* @author zhuhuipei
11+
* @date 2021/5/5
12+
* @time 19:49
13+
*/
14+
@Data
15+
public class JobConfigHistory implements Serializable {
16+
17+
private static final long serialVersionUID = 1L;
18+
19+
private Long id;
20+
21+
/**
22+
* job_config主表Id
23+
*/
24+
private Long jobConfigId;
25+
26+
/**
27+
* 任务名称
28+
*/
29+
private String jobName;
30+
31+
/**
32+
* 提交模式: standalone 、yarn 、yarn-session
33+
*/
34+
private String deployMode;
35+
36+
/**
37+
* flink运行配置
38+
*/
39+
private String flinkRunConfig;
40+
41+
/**
42+
* checkPoint配置
43+
*/
44+
private String flinkCheckpointConfig;
45+
46+
/**
47+
* udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar
48+
*/
49+
private String extJarPath;
50+
51+
/**
52+
* 更新版本号
53+
*/
54+
private Integer version;
55+
56+
private Boolean isDeleted;
57+
58+
/**
59+
* 创建时间
60+
*/
61+
private Date createTime;
62+
63+
/**
64+
* 修改时间
65+
*/
66+
private Date editTime;
67+
68+
private String creator;
69+
70+
private String editor;
71+
72+
/**
73+
* sql语句
74+
*/
75+
private String flinkSql;
76+
77+
78+
79+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package com.flink.streaming.web.model.vo;
2+
3+
import cn.hutool.core.collection.CollectionUtil;
4+
import com.flink.streaming.web.common.util.DateFormatUtils;
5+
import com.flink.streaming.web.model.dto.JobConfigHistoryDTO;
6+
import lombok.Data;
7+
8+
import java.io.Serializable;
9+
import java.util.ArrayList;
10+
import java.util.Collections;
11+
import java.util.List;
12+
13+
/**
14+
* @author zhuhuipei
15+
* @date 2021/5/5
16+
* @time 19:49
17+
*/
18+
@Data
19+
public class JobConfigHistoryVO implements Serializable {
20+
21+
private static final long serialVersionUID = 1L;
22+
23+
private Long id;
24+
25+
/**
26+
* job_config主表Id
27+
*/
28+
private Long jobConfigId;
29+
30+
/**
31+
* 任务名称
32+
*/
33+
private String jobName;
34+
35+
/**
36+
* 提交模式: standalone 、yarn 、yarn-session
37+
*/
38+
private String deployMode;
39+
40+
/**
41+
* flink运行配置
42+
*/
43+
private String flinkRunConfig;
44+
45+
/**
46+
* checkPoint配置
47+
*/
48+
private String flinkCheckpointConfig;
49+
50+
/**
51+
* udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar
52+
*/
53+
private String extJarPath;
54+
55+
/**
56+
* 更新版本号
57+
*/
58+
private Integer version;
59+
60+
/**
61+
* 创建时间
62+
*/
63+
private String createTime;
64+
65+
/**
66+
* 修改时间
67+
*/
68+
private String editTime;
69+
70+
private String creator;
71+
72+
private String editor;
73+
74+
/**
75+
* sql语句
76+
*/
77+
private String flinkSql;
78+
79+
80+
public static JobConfigHistoryVO toVO(JobConfigHistoryDTO jobConfigHistoryDTO, boolean isFlinkSql) {
81+
if (jobConfigHistoryDTO == null) {
82+
return null;
83+
}
84+
JobConfigHistoryVO jobConfigHistoryVO = new JobConfigHistoryVO();
85+
jobConfigHistoryVO.setId(jobConfigHistoryDTO.getId());
86+
jobConfigHistoryVO.setJobConfigId(jobConfigHistoryDTO.getJobConfigId());
87+
jobConfigHistoryVO.setJobName(jobConfigHistoryDTO.getJobName());
88+
jobConfigHistoryVO.setDeployMode(jobConfigHistoryDTO.getDeployMode());
89+
jobConfigHistoryVO.setFlinkRunConfig(jobConfigHistoryDTO.getFlinkRunConfig());
90+
jobConfigHistoryVO.setFlinkCheckpointConfig(jobConfigHistoryDTO.getFlinkCheckpointConfig());
91+
jobConfigHistoryVO.setExtJarPath(jobConfigHistoryDTO.getExtJarPath());
92+
jobConfigHistoryVO.setVersion(jobConfigHistoryDTO.getVersion());
93+
jobConfigHistoryVO.setCreateTime(DateFormatUtils.toFormatString(jobConfigHistoryDTO.getCreateTime()));
94+
jobConfigHistoryVO.setEditTime(DateFormatUtils.toFormatString(jobConfigHistoryDTO.getEditTime()));
95+
jobConfigHistoryVO.setCreator(jobConfigHistoryDTO.getCreator());
96+
jobConfigHistoryVO.setEditor(jobConfigHistoryDTO.getEditor());
97+
if (isFlinkSql) {
98+
jobConfigHistoryVO.setFlinkSql(jobConfigHistoryDTO.getFlinkSql());
99+
}
100+
return jobConfigHistoryVO;
101+
}
102+
103+
public static List<JobConfigHistoryVO> toListVO(List<JobConfigHistoryDTO> jobConfigHistoryDTOList) {
104+
if (CollectionUtil.isEmpty(jobConfigHistoryDTOList)) {
105+
return Collections.EMPTY_LIST;
106+
}
107+
List<JobConfigHistoryVO> list = new ArrayList<>();
108+
109+
for (JobConfigHistoryDTO jobConfigHistoryDTO : jobConfigHistoryDTOList) {
110+
list.add(toVO(jobConfigHistoryDTO, Boolean.FALSE));
111+
}
112+
113+
return list;
114+
}
115+
}

0 commit comments

Comments
 (0)