Skip to content

Commit 9072007

Browse files
committed
[kafka not null][flinkStreamSQL支持not null 语法][17872]
1 parent 5517bd8 commit 9072007

File tree

13 files changed

+92
-51
lines changed

13 files changed

+92
-51
lines changed

cassandra/cassandra-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
</copy>
7272

7373
<move file="${basedir}/../../plugins/cassandrasink/${project.artifactId}-${project.version}.jar"
74-
tofile="${basedir}/../../plugins/cassandrasink/${project.name}.jar" />
74+
tofile="${basedir}/../../plugins/cassandrasink/${project.name}.jar-${git.branch}.jar" />
7575
</tasks>
7676
</configuration>
7777
</execution>

console/console-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
</copy>
6868

6969
<move file="${basedir}/../../plugins/consolesink/${project.artifactId}-${project.version}.jar"
70-
tofile="${basedir}/../../plugins/consolesink/${project.name}.jar" />
70+
tofile="${basedir}/../../plugins/consolesink/${project.name}.jar-${git.branch}.jar" />
7171
</tasks>
7272
</configuration>
7373
</execution>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
import java.io.IOException;
7979
import java.lang.reflect.Field;
8080
import java.lang.reflect.InvocationTargetException;
81-
import java.lang.reflect.Method;
8281
import java.net.URL;
8382
import java.net.URLClassLoader;
8483
import java.net.URLDecoder;
@@ -251,14 +250,11 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
251250
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
252251
StreamTableEnvironment tableEnv)
253252
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
253+
//load jar
254+
URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
254255
//register urf
255-
URLClassLoader classLoader = null;
256256
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
257257
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
258-
//classloader
259-
if (classLoader == null) {
260-
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
261-
}
262258
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
263259
tableEnv, classLoader);
264260
}
@@ -335,19 +331,9 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
335331
new MyLocalStreamEnvironment();
336332
env.getConfig().disableClosureCleaner();
337333
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
334+
338335
Configuration globalJobParameters = new Configuration();
339-
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
340-
method.setAccessible(true);
341-
342-
confProperties.forEach((key,val) -> {
343-
try {
344-
method.invoke(globalJobParameters, key, val);
345-
} catch (IllegalAccessException e) {
346-
e.printStackTrace();
347-
} catch (InvocationTargetException e) {
348-
e.printStackTrace();
349-
}
350-
});
336+
globalJobParameters.addAllToProperties(confProperties);
351337

352338
ExecutionConfig exeConfig = env.getConfig();
353339
if(exeConfig.getGlobalJobParameters() == null){

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public static SqlTree parseSql(String sql) throws Exception {
8888

8989
sqlParser.parseSql(childSql, sqlTree);
9090
result = true;
91+
break;
9192
}
9293

9394
if(!result){

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
105105
tableInfo.addField(fieldName);
106106
tableInfo.addFieldClass(fieldClass);
107107
tableInfo.addFieldType(fieldType);
108+
tableInfo.addFieldExtraInfo(null);
108109
}
109110

110111
tableInfo.finish();

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public abstract class TableInfo implements Serializable {
5757

5858
private final List<Class> fieldClassList = Lists.newArrayList();
5959

60+
private final List<FieldExtraInfo> fieldExtraInfoList = Lists.newArrayList();
61+
6062
private List<String> primaryKeys;
6163

6264
private Integer parallelism = 1;
@@ -143,6 +145,10 @@ public void setFieldClasses(Class<?>[] fieldClasses) {
143145
this.fieldClasses = fieldClasses;
144146
}
145147

148+
public void addFieldExtraInfo(FieldExtraInfo extraInfo) {
149+
fieldExtraInfoList.add(extraInfo);
150+
}
151+
146152
public List<String> getFieldList() {
147153
return fieldList;
148154
}
@@ -159,6 +165,10 @@ public Map<String, String> getPhysicalFields() {
159165
return physicalFields;
160166
}
161167

168+
public List<FieldExtraInfo> getFieldExtraInfoList() {
169+
return fieldExtraInfoList;
170+
}
171+
162172
public void setPhysicalFields(Map<String, String> physicalFields) {
163173
this.physicalFields = physicalFields;
164174
}
@@ -168,4 +178,25 @@ public void finish(){
168178
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);
169179
this.fieldTypes = fieldTypeList.toArray(new String[fieldTypeList.size()]);
170180
}
181+
182+
/**
183+
* field extra info,used to store `not null` `default 0`...,
184+
*
185+
* now, only support not null
186+
*/
187+
public static class FieldExtraInfo implements Serializable {
188+
189+
/**
190+
* default false:allow field is null
191+
*/
192+
boolean notNull;
193+
194+
public boolean getNotNull() {
195+
return notNull;
196+
}
197+
198+
public void setNotNull(boolean notNull) {
199+
this.notNull = notNull;
200+
}
201+
}
171202
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
136136
if(characteristicStr.equalsIgnoreCase(tmp.toString())){
137137
env.setStreamTimeCharacteristic(tmp);
138138
flag = true;
139+
break;
139140
}
140141
}
141142

@@ -244,21 +245,9 @@ public static long getBufferTimeoutMillis(Properties properties){
244245
}
245246

246247
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
247-
248-
int size = 0;
249-
for(URL url : jarURLList){
250-
if(url.toString().endsWith(".jar")){
251-
size++;
252-
}
253-
}
254-
255-
URL[] urlArray = new URL[size];
256-
int i=0;
257248
for(URL url : jarURLList){
258249
if(url.toString().endsWith(".jar")){
259-
urlArray[i] = url;
260250
urlClassLoaderAddUrl(classLoader, url);
261-
i++;
262251
}
263252
}
264253

hbase/hbase-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
</copy>
6969

7070
<move file="${basedir}/../../plugins/hbasesink/${project.artifactId}-${project.version}.jar"
71-
tofile="${basedir}/../../plugins/hbasesink/${project.name}.jar" />
71+
tofile="${basedir}/../../plugins/hbasesink/${project.name}.jar-${git.branch}.jar" />
7272
</tasks>
7373
</configuration>
7474
</execution>

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import org.apache.flink.api.common.typeinfo.TypeInformation;
2728
import org.apache.flink.api.common.typeinfo.Types;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -50,6 +51,7 @@
5051
import java.sql.Time;
5152
import java.sql.Timestamp;
5253
import java.util.Iterator;
54+
import java.util.List;
5355
import java.util.Map;
5456
import java.util.Set;
5557

@@ -83,9 +85,6 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8385
/** Types to parse fields as. Indices match fieldNames indices. */
8486
private final TypeInformation<?>[] fieldTypes;
8587

86-
/** Flag indicating whether to fail on a missing field. */
87-
private boolean failOnMissingField;
88-
8988
private AbstractFetcher<Row, ?> fetcher;
9089

9190
private boolean firstMsg = true;
@@ -94,15 +93,14 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
9493

9594
private Map<String, String> rowAndFieldMapping;
9695

96+
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
9797

98-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
98+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
9999
this.typeInfo = typeInfo;
100-
101100
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
102-
103101
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
104-
105102
this.rowAndFieldMapping= rowAndFieldMapping;
103+
this.fieldExtraInfos = fieldExtraInfos;
106104
}
107105

108106
@Override
@@ -133,9 +131,10 @@ public Row deserialize(byte[] message) throws IOException {
133131

134132
for (int i = 0; i < fieldNames.length; i++) {
135133
JsonNode node = getIgnoreCase(fieldNames[i]);
134+
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
136135

137136
if (node == null) {
138-
if (failOnMissingField) {
137+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
139138
throw new IllegalStateException("Failed to find field with name '"
140139
+ fieldNames[i] + "'.");
141140
} else {
@@ -180,10 +179,6 @@ public JsonNode getIgnoreCase(String key) {
180179
return node;
181180
}
182181

183-
public void setFailOnMissingField(boolean failOnMissingField) {
184-
this.failOnMissingField = failOnMissingField;
185-
}
186-
187182
private void parseTree(JsonNode jsonNode, String prefix){
188183

189184
Iterator<String> iterator = jsonNode.fieldNames();

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8989
FlinkKafkaConsumer011<Row> kafkaSrc;
9090
if (BooleanUtils.isTrue(kafka011SourceTableInfo.getTopicIsPattern())) {
9191
kafkaSrc = new CustomerKafka011Consumer(Pattern.compile(topicName),
92-
new CustomerJsonDeserialization(typeInformation, kafka011SourceTableInfo.getPhysicalFields()), props);
92+
new CustomerJsonDeserialization(typeInformation, kafka011SourceTableInfo.getPhysicalFields(), kafka011SourceTableInfo.getFieldExtraInfoList()), props);
9393
} else {
9494
kafkaSrc = new CustomerKafka011Consumer(topicName,
95-
new CustomerJsonDeserialization(typeInformation, kafka011SourceTableInfo.getPhysicalFields()), props);
95+
new CustomerJsonDeserialization(typeInformation, kafka011SourceTableInfo.getPhysicalFields(), kafka011SourceTableInfo.getFieldExtraInfoList()), props);
9696
}
9797

98-
9998
//earliest,latest
10099
if ("earliest".equalsIgnoreCase(kafka011SourceTableInfo.getOffsetReset())) {
101100
kafkaSrc.setStartFromEarliest();

0 commit comments

Comments
 (0)