Skip to content

Commit ab9f9ae

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into v1.9.0_dev
2 parents ce02036 + b8d4811 commit ab9f9ae

File tree

7 files changed

+41
-71
lines changed

7 files changed

+41
-71
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@
5050
* 操作系统:理论上不限
5151
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
5252
```
53+
## hadoop配置文件路径
5354
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
5455
security.kerberos.login.use-ticket-cache: true
55-
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/yanxi.keytab
56-
security.kerberos.login.principal: yanxi@DTSTACK.COM
56+
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab
57+
security.kerberos.login.principal: maqi@DTSTACK.COM
5758
security.kerberos.login.contexts: Client,KafkaClient
5859
zookeeper.sasl.service-name: zookeeper
5960
zookeeper.sasl.login-context-name: Client

core/src/main/java/com/dtstack/flink/sql/watermarker/AbsCustomerWaterMarker.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121

2222
import com.dtstack.flink.sql.metric.EventDelayGauge;
2323
import com.dtstack.flink.sql.metric.MetricConstant;
24+
import com.dtstack.flink.sql.util.MathUtil;
2425
import org.apache.flink.api.common.functions.IterationRuntimeContext;
2526
import org.apache.flink.api.common.functions.RichFunction;
2627
import org.apache.flink.api.common.functions.RuntimeContext;
2728
import org.apache.flink.configuration.Configuration;
2829
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
2930
import org.apache.flink.streaming.api.windowing.time.Time;
3031

32+
import java.util.TimeZone;
33+
3134
/**
3235
* Reason:
3336
* Date: 2018/10/18
@@ -46,6 +49,12 @@ public abstract class AbsCustomerWaterMarker<T> extends BoundedOutOfOrdernessTim
4649

4750
protected transient EventDelayGauge eventDelayGauge;
4851

52+
protected int pos;
53+
54+
protected long lastTime = 0;
55+
56+
protected TimeZone timezone;
57+
4958
public AbsCustomerWaterMarker(Time maxOutOfOrderness) {
5059
super(maxOutOfOrderness);
5160
}
@@ -90,4 +99,13 @@ public void setRuntimeContext(RuntimeContext t) {
9099
public void setFromSourceTag(String fromSourceTag) {
91100
this.fromSourceTag = fromSourceTag;
92101
}
102+
103+
protected long getExtractTimestamp(Long extractTime){
104+
105+
lastTime = extractTime + timezone.getOffset(extractTime);
106+
107+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
108+
109+
return lastTime;
110+
}
93111
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,6 @@ public class CustomerWaterMarkerForLong extends AbsCustomerWaterMarker<Row> {
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
4141

42-
private static final long serialVersionUID = 1L;
43-
44-
private int pos;
45-
46-
private long lastTime = 0;
47-
48-
private TimeZone timezone;
49-
5042
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
5143
super(maxOutOfOrderness);
5244
this.pos = pos;
@@ -58,12 +50,7 @@ public long extractTimestamp(Row row) {
5850

5951
try{
6052
Long extractTime = MathUtil.getLongVal(row.getField(pos));
61-
62-
lastTime = extractTime + timezone.getOffset(extractTime);
63-
64-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
65-
66-
return lastTime;
53+
return getExtractTimestamp(extractTime);
6754
}catch (Exception e){
6855
logger.error("", e);
6956
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,6 @@ public class CustomerWaterMarkerForTimeStamp extends AbsCustomerWaterMarker<Row>
4040

4141
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4242

43-
private static final long serialVersionUID = 1L;
44-
45-
private int pos;
46-
47-
private long lastTime = 0;
48-
49-
private TimeZone timezone;
50-
5143
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
5244
super(maxOutOfOrderness);
5345
this.pos = pos;
@@ -58,14 +50,8 @@ public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String ti
5850
public long extractTimestamp(Row row) {
5951
try {
6052
Timestamp time = (Timestamp) row.getField(pos);
61-
6253
long extractTime=time.getTime();
63-
64-
lastTime = extractTime + timezone.getOffset(extractTime);
65-
66-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
67-
68-
return lastTime;
54+
return getExtractTimestamp(extractTime);
6955
} catch (RuntimeException e) {
7056
logger.error("", e);
7157
}

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.apache.flink.streaming.api.windowing.time.Time;
2929
import org.apache.flink.types.Row;
3030
import org.apache.flink.util.Preconditions;
31-
32-
import java.util.TimeZone;
31+
import java.sql.Timestamp;
32+
import java.lang.Long;
3333

3434
/**
3535
* define watermarker
@@ -76,9 +76,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7676
TypeInformation fieldType = fieldTypes[pos];
7777

7878
AbsCustomerWaterMarker waterMarker = null;
79-
if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.sql.Timestamp")){
79+
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
8080
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
81-
}else if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.lang.Long")){
81+
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
8282
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
8383
}else{
8484
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.configuration.Configuration;
2727
import com.google.common.base.Strings;
2828
import org.apache.flink.runtime.jobgraph.JobGraph;
29+
import org.apache.flink.runtime.security.SecurityConfiguration;
30+
import org.apache.flink.runtime.security.SecurityUtils;
2931
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3032
import org.apache.flink.yarn.YarnClusterDescriptor;
3133
import org.apache.hadoop.fs.Path;
@@ -55,45 +57,40 @@ public class PerJobClusterClientBuilder {
5557

5658
private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class);
5759

58-
private static String KEYTAB = "security.kerberos.login.keytab";
59-
60-
private static String PRINCIPAL = "security.kerberos.login.principal";
60+
private static final String DEFAULT_CONF_DIR = "./";
6161

6262
private YarnClient yarnClient;
6363

6464
private YarnConfiguration yarnConf;
6565

66-
public void init(String yarnConfDir, Properties conf) throws IOException {
66+
private Configuration flinkConfig;
67+
68+
public void init(String yarnConfDir, Configuration flinkConfig, Properties userConf) throws Exception {
6769

6870
if(Strings.isNullOrEmpty(yarnConfDir)) {
6971
throw new RuntimeException("parameters of yarn is required");
7072
}
73+
userConf.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
74+
this.flinkConfig = flinkConfig;
75+
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
7176

7277
yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
73-
74-
if (isKerberos(conf)){
75-
String keytab = (String) conf.get(KEYTAB);
76-
String principal = (String) conf.get(PRINCIPAL);
77-
login(yarnConf, keytab, principal);
78-
}
79-
8078
yarnClient = YarnClient.createYarnClient();
8179
yarnClient.init(yarnConf);
8280
yarnClient.start();
8381

8482
System.out.println("----init yarn success ----");
8583
}
8684

87-
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig)
85+
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph)
8886
throws MalformedURLException {
8987

90-
confProp.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
91-
String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? "" : launcherOptions.getFlinkconf();
88+
String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf();
9289
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf);
9390

9491
if (StringUtils.isNotBlank(flinkJarPath)) {
9592
if (!new File(flinkJarPath).exists()) {
96-
throw new RuntimeException("The Flink jar path is not exist");
93+
throw new RuntimeException("The param '-flinkJarPath' ref dir is not exist");
9794
}
9895
}
9996

@@ -163,22 +160,4 @@ private AbstractYarnClusterDescriptor getClusterDescriptor(
163160
false);
164161
}
165162

166-
private boolean isKerberos(Properties conf){
167-
String keytab = (String) conf.get(KEYTAB);
168-
if (StringUtils.isNotBlank(keytab)){
169-
return true;
170-
} else {
171-
return false;
172-
}
173-
}
174-
175-
private void login(org.apache.hadoop.conf.Configuration conf, String keytab, String principal) throws IOException {
176-
if (StringUtils.isEmpty(principal)){
177-
throw new RuntimeException(PRINCIPAL + " must not be null!");
178-
}
179-
UserGroupInformation.setConfiguration(conf);
180-
UserGroupInformation.loginUserFromKeytab(principal, keytab);
181-
LOG.info("login successfully! keytab: " + keytab + "principal: " + principal);
182-
LOG.info("UGI: " + UserGroupInformation.getCurrentUser());
183-
}
184163
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
6262
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
6363

6464
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
65-
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), confProperties);
65+
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), flinkConfig, confProperties);
6666

6767
String flinkJarPath = launcherOptions.getFlinkJarPath();
68-
69-
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions, jobGraph, flinkConfig);
68+
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(flinkJarPath, launcherOptions, jobGraph);
7069
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
7170

7271
String applicationId = clusterClient.getClusterId().toString();

0 commit comments

Comments
 (0)