Skip to content

Commit 91efc6e

Browse files
committed
use retract stream
1 parent 5c5a519 commit 91efc6e

File tree

1 file changed

+1
-3
lines changed

1 file changed

+1
-3
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -593,9 +593,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
593593
}
594594

595595
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
596-
//DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class);
597-
DataStream adaptStream = tableEnv.toAppendStream(targetTable, org.apache.flink.types.Row.class);
598-
596+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class);
599597
//join side table before keyby ===> Reducing the size of each dimension table cache of async
600598
if(sideTableInfo.isPartitionedJoin()){
601599
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());

0 commit comments

Comments
 (0)