Skip to content

Commit e90cc37

Browse files
committed
merge 1.8
1 parent 3de90b0 commit e90cc37

File tree

6 files changed

+24
-11
lines changed

6 files changed

+24
-11
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.apache.calcite.sql.parser.SqlParseException;
4646
import org.apache.calcite.sql.parser.SqlParserPos;
4747
import org.apache.commons.collections.CollectionUtils;
48-
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
4948
import org.apache.flink.api.common.typeinfo.TypeInformation;
5049
import org.apache.flink.api.java.tuple.Tuple2;
5150
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -54,13 +53,14 @@
5453
import com.google.common.collect.Maps;
5554
import org.apache.flink.streaming.api.datastream.DataStream;
5655
import org.apache.flink.table.api.Table;
57-
import org.apache.flink.table.api.TableEnvironment;
5856
import org.apache.flink.table.api.java.StreamTableEnvironment;
5957
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
6058
import org.apache.flink.types.Row;
6159
import org.slf4j.Logger;
6260
import org.slf4j.LoggerFactory;
6361
import java.sql.Timestamp;
62+
import java.time.LocalDateTime;
63+
import java.util.Arrays;
6464
import java.util.Collection;
6565
import java.util.LinkedList;
6666
import java.util.List;
@@ -128,7 +128,6 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
128128
System.out.println("----------real exec sql-----------" );
129129
System.out.println(pollSqlNode.toString());
130130
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
131-
// tableEnv.sqlUpdate(pollSqlNode.toString());
132131
if(LOG.isInfoEnabled()){
133132
LOG.info("exec sql: " + pollSqlNode.toString());
134133
}
@@ -160,15 +159,12 @@ private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, H
160159
SqlNode source = ((SqlInsert) pollSqlNode).getSource();
161160
addAliasForFieldNode(source, fieldList, mappingTable);
162161
break;
163-
164162
case AS:
165163
addAliasForFieldNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable);
166164
break;
167165

168166
case SELECT:
169-
170167
SqlNodeList selectList = ((SqlSelect) pollSqlNode).getSelectList();
171-
172168
selectList.getList().forEach(node -> {
173169
if (node.getKind() == IDENTIFIER) {
174170
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
@@ -183,15 +179,13 @@ private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, H
183179

184180
}
185181
});
186-
187182
for (int i = 0; i < selectList.getList().size(); i++) {
188183
SqlNode node = selectList.get(i);
189184
if (node.getKind() == IDENTIFIER) {
190185
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
191186
if (sqlIdentifier.names.size() == 1) {
192187
return;
193188
}
194-
195189
String name = sqlIdentifier.names.get(1);
196190
// avoid real field pv0 convert pv
197191
if (name.endsWith("0") && !fieldList.contains(name) && !fieldList.contains(name.substring(0, name.length() - 1))) {
@@ -268,7 +262,7 @@ private RowTypeInfo buildLeftTableOutType(RowTypeInfo leftTypeInfo) {
268262

269263
private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) {
270264
if (typeInformation instanceof TimeIndicatorTypeInfo) {
271-
return TypeInformation.of(Timestamp.class);
265+
return TypeInformation.of(LocalDateTime.class);
272266
}
273267
return typeInformation;
274268
}

impala/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@
4242
<version>2.6.12</version>
4343
</dependency>
4444

45+
<dependency>
46+
<groupId>org.apache.hadoop</groupId>
47+
<artifactId>hadoop-common</artifactId>
48+
<version>2.7.3</version>
49+
</dependency>
50+
4551
</dependencies>
4652

4753

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,12 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
5656
private static final long serialVersionUID = 2385115520960444192L;
5757

5858
private AbstractFetcher<Row, ?> fetcher;
59+
private TypeInformation<Row> typeInfo;
5960

6061
private boolean firstMsg = true;
6162

6263
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
64+
this.typeInfo = typeInfo;
6365
this.jsonDataParser= new JsonDataParser(typeInfo,rowAndFieldMapping,fieldExtraInfos);
6466
}
6567

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6060
private static final long serialVersionUID = 2385115520960444192L;
6161

6262
private AbstractFetcher<Row, ?> fetcher;
63+
private TypeInformation<Row> typeInfo;
6364

6465
private boolean firstMsg = true;
6566

6667
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
68+
this.typeInfo = typeInfo;
6769
this.jsonDataParser= new JsonDataParser(typeInfo,rowAndFieldMapping,fieldExtraInfos);
6870
}
6971

@@ -120,6 +122,11 @@ private static String partitionLagMetricName(TopicPartition tp) {
120122
return tp + ".records-lag";
121123
}
122124

125+
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
126+
this.fetcher = fetcher;
127+
}
128+
129+
123130

124131
@Override
125132
public TypeInformation<Row> getProducedType() {

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6060
private static final long serialVersionUID = 2385115520960444192L;
6161

6262
private AbstractFetcher<Row, ?> fetcher;
63+
private TypeInformation<Row> typeInfo;
6364

6465
private boolean firstMsg = true;
6566

6667
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
68+
this.typeInfo = typeInfo;
6769
this.jsonDataParser= new JsonDataParser(typeInfo,rowAndFieldMapping,fieldExtraInfos);
6870
}
6971

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6060
private static final long serialVersionUID = 2385115520960444192L;
6161

6262
private AbstractFetcher<Row, ?> fetcher;
63+
private TypeInformation<Row> typeInfo;
6364

6465
private boolean firstMsg = true;
6566

66-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
67-
this.jsonDataParser= new JsonDataParser(typeInfo,rowAndFieldMapping,fieldExtraInfos);
67+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
68+
this.typeInfo = typeInfo;
69+
this.jsonDataParser = new JsonDataParser(typeInfo, rowAndFieldMapping, fieldExtraInfos);
6870
}
6971

7072
@Override

0 commit comments

Comments
 (0)