Skip to content

Commit 8e4fa1b

Browse files
committed
modify sidesqlexc datastream type
1 parent 9476f03 commit 8e4fa1b

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,9 +595,11 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
595595
}
596596

597597
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
598+
598599
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
599600
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
600-
.returns(typeInfo);
601+
.returns(Row.class);
602+
601603
//join side table before keyby ===> Reducing the size of each dimension table cache of async
602604
if(sideTableInfo.isPartitionedJoin()){
603605
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());

0 commit comments

Comments
 (0)