Skip to content

Commit 0f15d10

Browse files
committed
retract stream convert type
1 parent 9aca0f0 commit 0f15d10

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
4747
import org.apache.flink.api.common.time.Time;
4848
import org.apache.flink.api.common.typeinfo.TypeInformation;
49+
import org.apache.flink.api.java.tuple.Tuple2;
4950
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5051
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
5152
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
@@ -271,7 +272,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
271272
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
272273

273274
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
274-
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
275+
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
276+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
277+
.returns(typeInfo);
278+
275279
String fields = String.join(",", typeInfo.getFieldNames());
276280

277281
if(waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)){

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -41,13 +41,15 @@
4141
import org.apache.calcite.sql.parser.SqlParserPos;
4242
import org.apache.commons.collections.CollectionUtils;
4343
import org.apache.flink.api.common.typeinfo.TypeInformation;
44+
import org.apache.flink.api.java.tuple.Tuple2;
4445
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4546
import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
4647
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4748
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
4849
import org.apache.flink.streaming.api.datastream.DataStream;
4950
import org.apache.flink.table.api.Table;
5051
import org.apache.flink.table.api.java.StreamTableEnvironment;
52+
import org.apache.flink.types.Row;
5153

5254
import java.util.*;
5355

@@ -593,9 +595,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
593595
}
594596

595597
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
596-
//DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class);
597-
DataStream adaptStream = tableEnv.toAppendStream(targetTable, org.apache.flink.types.Row.class);
598-
598+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
599+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
600+
.returns(typeInfo);
599601
//join side table before keyby ===> Reducing the size of each dimension table cache of async
600602
if(sideTableInfo.isPartitionedJoin()){
601603
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());

0 commit comments

Comments
 (0)