Skip to content

Commit bc0d829

Browse files
Merge branch 'v1.8.0_dev' of http://git.dtstack.cn/dtstack/dt-center-flinkStreamSQL into v1.8.0_dev
ysq
2 parents 74af8b3 + 918b593 commit bc0d829

File tree

26 files changed

+268
-84
lines changed

26 files changed

+268
-84
lines changed

cassandra/cassandra-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
</copy>
7272

7373
<move file="${basedir}/../../plugins/cassandrasink/${project.artifactId}-${project.version}.jar"
74-
tofile="${basedir}/../../plugins/cassandrasink/${project.name}.jar" />
74+
tofile="${basedir}/../../plugins/cassandrasink/${project.name}-${git.branch}.jar" />
7575
</tasks>
7676
</configuration>
7777
</execution>

console/console-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
</copy>
6868

6969
<move file="${basedir}/../../plugins/consolesink/${project.artifactId}-${project.version}.jar"
70-
tofile="${basedir}/../../plugins/consolesink/${project.name}.jar" />
70+
tofile="${basedir}/../../plugins/consolesink/${project.name}-${git.branch}.jar" />
7171
</tasks>
7272
</configuration>
7373
</execution>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.slf4j.Logger;
7171
import org.slf4j.LoggerFactory;
7272
import java.io.File;
73-
import java.io.IOException;
7473
import java.lang.reflect.Field;
7574
import java.lang.reflect.InvocationTargetException;
7675
import java.lang.reflect.Method;
@@ -227,14 +226,14 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
227226
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
228227
StreamTableEnvironment tableEnv)
229228
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
230-
//register urf
231-
URLClassLoader classLoader = null;
232229
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
230+
if (funcList.isEmpty()) {
231+
return;
232+
}
233+
//load jar
234+
URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
235+
//register urf
233236
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
234-
//classloader
235-
if (classLoader == null) {
236-
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
237-
}
238237
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
239238
tableEnv, classLoader);
240239
}
@@ -305,22 +304,21 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
305304
}
306305
}
307306

308-
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException {
307+
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
309308
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
310309
StreamExecutionEnvironment.getExecutionEnvironment() :
311310
new MyLocalStreamEnvironment();
312311
env.getConfig().disableClosureCleaner();
313312
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
313+
314314
Configuration globalJobParameters = new Configuration();
315+
//Configuration unsupported set properties key-value
315316
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
316317
method.setAccessible(true);
317-
confProperties.forEach((key,val) -> {
318-
try {
319-
method.invoke(globalJobParameters, key, val);
320-
} catch (Exception e) {
321-
LOG.error("set Configuration key:{},value:{} error:{}",key,val,e);
322-
}
323-
});
318+
for (Map.Entry<Object, Object> prop : confProperties.entrySet()) {
319+
method.invoke(globalJobParameters, prop.getKey(), prop.getValue());
320+
}
321+
324322
ExecutionConfig exeConfig = env.getConfig();
325323
if(exeConfig.getGlobalJobParameters() == null){
326324
exeConfig.setGlobalJobParameters(globalJobParameters);

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public static SqlTree parseSql(String sql) throws Exception {
8888

8989
sqlParser.parseSql(childSql, sqlTree);
9090
result = true;
91+
break;
9192
}
9293

9394
if(!result){

core/src/main/java/com/dtstack/flink/sql/table/AbsSourceParser.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.table;
2222

23+
import com.dtstack.flink.sql.util.ClassUtil;
2324
import com.dtstack.flink.sql.util.MathUtil;
2425

2526
import java.util.regex.Matcher;
@@ -36,19 +37,21 @@
3637
public abstract class AbsSourceParser extends AbsTableParser {
3738

3839
private static final String VIRTUAL_KEY = "virtualFieldKey";
39-
4040
private static final String WATERMARK_KEY = "waterMarkKey";
41+
private static final String NOTNULL_KEY = "notNullKey";
4142

4243
private static Pattern virtualFieldKeyPattern = Pattern.compile("(?i)^(\\S+\\([^\\)]+\\))\\s+AS\\s+(\\w+)$");
43-
4444
private static Pattern waterMarkKeyPattern = Pattern.compile("(?i)^\\s*WATERMARK\\s+FOR\\s+(\\S+)\\s+AS\\s+withOffset\\(\\s*(\\S+)\\s*,\\s*(\\d+)\\s*\\)$");
45+
private static Pattern notNullKeyPattern = Pattern.compile("(?i)^(\\w+)\\s+(\\w+)\\s+NOT\\s+NULL?$");
4546

4647
static {
4748
keyPatternMap.put(VIRTUAL_KEY, virtualFieldKeyPattern);
4849
keyPatternMap.put(WATERMARK_KEY, waterMarkKeyPattern);
50+
keyPatternMap.put(NOTNULL_KEY, notNullKeyPattern);
4951

5052
keyHandlerMap.put(VIRTUAL_KEY, AbsSourceParser::dealVirtualField);
5153
keyHandlerMap.put(WATERMARK_KEY, AbsSourceParser::dealWaterMark);
54+
keyHandlerMap.put(NOTNULL_KEY, AbsSourceParser::dealNotNull);
5255
}
5356

5457
static void dealVirtualField(Matcher matcher, TableInfo tableInfo){
@@ -66,4 +69,18 @@ static void dealWaterMark(Matcher matcher, TableInfo tableInfo){
6669
sourceTableInfo.setEventTimeField(eventTimeField);
6770
sourceTableInfo.setMaxOutOrderness(offset);
6871
}
72+
73+
static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
74+
String fieldName = matcher.group(1);
75+
String fieldType = matcher.group(2);
76+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
77+
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
78+
fieldExtraInfo.setNotNull(true);
79+
80+
tableInfo.addPhysicalMappings(fieldName, fieldName);
81+
tableInfo.addField(fieldName);
82+
tableInfo.addFieldClass(fieldClass);
83+
tableInfo.addFieldType(fieldType);
84+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
85+
}
6986
}

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
105105
tableInfo.addField(fieldName);
106106
tableInfo.addFieldClass(fieldClass);
107107
tableInfo.addFieldType(fieldType);
108+
tableInfo.addFieldExtraInfo(null);
108109
}
109110

110111
tableInfo.finish();

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public abstract class TableInfo implements Serializable {
5757

5858
private final List<Class> fieldClassList = Lists.newArrayList();
5959

60+
private final List<FieldExtraInfo> fieldExtraInfoList = Lists.newArrayList();
61+
6062
private List<String> primaryKeys;
6163

6264
private Integer parallelism = 1;
@@ -143,6 +145,10 @@ public void setFieldClasses(Class<?>[] fieldClasses) {
143145
this.fieldClasses = fieldClasses;
144146
}
145147

148+
public void addFieldExtraInfo(FieldExtraInfo extraInfo) {
149+
fieldExtraInfoList.add(extraInfo);
150+
}
151+
146152
public List<String> getFieldList() {
147153
return fieldList;
148154
}
@@ -159,6 +165,10 @@ public Map<String, String> getPhysicalFields() {
159165
return physicalFields;
160166
}
161167

168+
public List<FieldExtraInfo> getFieldExtraInfoList() {
169+
return fieldExtraInfoList;
170+
}
171+
162172
public void setPhysicalFields(Map<String, String> physicalFields) {
163173
this.physicalFields = physicalFields;
164174
}
@@ -168,4 +178,25 @@ public void finish(){
168178
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);
169179
this.fieldTypes = fieldTypeList.toArray(new String[fieldTypeList.size()]);
170180
}
181+
182+
/**
183+
* field extra info,used to store `not null` `default 0`...,
184+
*
185+
* now, only support not null
186+
*/
187+
public static class FieldExtraInfo implements Serializable {
188+
189+
/**
190+
* default false:allow field is null
191+
*/
192+
boolean notNull = false;
193+
194+
public boolean getNotNull() {
195+
return notNull;
196+
}
197+
198+
public void setNotNull(boolean notNull) {
199+
this.notNull = notNull;
200+
}
201+
}
171202
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
137137
if(characteristicStr.equalsIgnoreCase(tmp.toString())){
138138
env.setStreamTimeCharacteristic(tmp);
139139
flag = true;
140+
break;
140141
}
141142
}
142143

@@ -245,21 +246,9 @@ public static long getBufferTimeoutMillis(Properties properties){
245246
}
246247

247248
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
248-
249-
int size = 0;
250-
for(URL url : jarURLList){
251-
if(url.toString().endsWith(".jar")){
252-
size++;
253-
}
254-
}
255-
256-
URL[] urlArray = new URL[size];
257-
int i=0;
258249
for(URL url : jarURLList){
259250
if(url.toString().endsWith(".jar")){
260-
urlArray[i] = url;
261251
urlClassLoaderAddUrl(classLoader, url);
262-
i++;
263252
}
264253
}
265254

hbase/hbase-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
</copy>
6969

7070
<move file="${basedir}/../../plugins/hbasesink/${project.artifactId}-${project.version}.jar"
71-
tofile="${basedir}/../../plugins/hbasesink/${project.name}.jar" />
71+
tofile="${basedir}/../../plugins/hbasesink/${project.name}-${git.branch}.jar" />
7272
</tasks>
7373
</configuration>
7474
</execution>

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import org.apache.flink.api.common.typeinfo.TypeInformation;
2728
import org.apache.flink.api.common.typeinfo.Types;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -49,6 +50,7 @@
4950
import java.sql.Time;
5051
import java.sql.Timestamp;
5152
import java.util.Iterator;
53+
import java.util.List;
5254
import java.util.Map;
5355
import java.util.Set;
5456

@@ -80,9 +82,6 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8082
/** Types to parse fields as. Indices match fieldNames indices. */
8183
private final TypeInformation<?>[] fieldTypes;
8284

83-
/** Flag indicating whether to fail on a missing field. */
84-
private boolean failOnMissingField;
85-
8685
private AbstractFetcher<Row, ?> fetcher;
8786

8887
private boolean firstMsg = true;
@@ -91,15 +90,14 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
9190

9291
private Map<String, String> rowAndFieldMapping;
9392

93+
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
9494

95-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
95+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
9696
this.typeInfo = typeInfo;
97-
9897
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
99-
10098
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
101-
10299
this.rowAndFieldMapping= rowAndFieldMapping;
100+
this.fieldExtraInfos = fieldExtraInfos;
103101
}
104102

105103
@Override
@@ -129,9 +127,10 @@ public Row deserialize(byte[] message) throws IOException {
129127

130128
for (int i = 0; i < fieldNames.length; i++) {
131129
JsonNode node = getIgnoreCase(fieldNames[i]);
130+
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
132131

133132
if (node == null) {
134-
if (failOnMissingField) {
133+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
135134
throw new IllegalStateException("Failed to find field with name '"
136135
+ fieldNames[i] + "'.");
137136
} else {
@@ -159,10 +158,6 @@ public Row deserialize(byte[] message) throws IOException {
159158
}
160159
}
161160

162-
public void setFailOnMissingField(boolean failOnMissingField) {
163-
this.failOnMissingField = failOnMissingField;
164-
}
165-
166161
private JsonNode getIgnoreCase(String key) {
167162
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
168163
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);

0 commit comments

Comments
 (0)