Skip to content

Commit 907f5a1

Browse files
committed
toAppendStream change to toRetactStream
1 parent 070cbd9 commit 907f5a1

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
4747
import org.apache.flink.api.common.time.Time;
4848
import org.apache.flink.api.common.typeinfo.TypeInformation;
49+
import org.apache.flink.api.java.tuple.Tuple2;
4950
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5051
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
5152
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
@@ -279,7 +280,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
279280
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
280281

281282
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
282-
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
283+
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
284+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
285+
.returns(typeInfo);
286+
283287
String fields = String.join(",", typeInfo.getFieldNames());
284288

285289
if(waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)){

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@
4040
import org.apache.calcite.sql.parser.SqlParserPos;
4141
import org.apache.commons.collections.CollectionUtils;
4242
import org.apache.flink.api.common.typeinfo.TypeInformation;
43+
import org.apache.flink.api.java.tuple.Tuple2;
4344
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4445
import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
4546
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4647
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
4748
import org.apache.flink.streaming.api.datastream.DataStream;
4849
import org.apache.flink.table.api.Table;
4950
import org.apache.flink.table.api.java.StreamTableEnvironment;
51+
import org.apache.flink.types.Row;
5052

5153
import java.util.*;
5254

@@ -597,7 +599,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
597599
}
598600

599601
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
600-
DataStream adaptStream = tableEnv.toAppendStream(targetTable, org.apache.flink.types.Row.class);
602+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
603+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
604+
.returns(Row.class);
601605

602606
//join side table before keyby ===> Reducing the size of each dimension table cache of async
603607
if(sideTableInfo.isPartitionedJoin()){

0 commit comments

Comments
 (0)