Skip to content

Commit 212a891

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into v1.8.0_dev
2 parents 941964f + a0a8e7a commit 212a891

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
747747
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
748748
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
749749
.returns(Row.class);
750-
adaptStream.getTransformation().setOutputType(leftTypeInfo);
750+
//adaptStream.getTransformation().setOutputType(leftTypeInfo);
751751

752752
//join side table before keyby ===> Reducing the size of each dimension table cache of async
753753
if(sideTableInfo.isPartitionedJoin()){

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.flink.api.java.typeutils.RowTypeInfo;
1010
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
1111
import org.apache.flink.streaming.api.datastream.DataStream;
12+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
1213
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
1314
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
1415
import org.apache.flink.table.sinks.RetractStreamTableSink;
@@ -63,7 +64,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
6364
.setFieldTypes(this.fieldTypes);
6465
KuduOutputFormat kuduOutputFormat = builder.finish();
6566
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(kuduOutputFormat);
66-
dataStream.addSink(richSinkFunction);
67+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
68+
dataStreamSink.name(tableName);
69+
if (parallelism > 0) {
70+
dataStreamSink.setParallelism(parallelism);
71+
}
6772
}
6873

6974
@Override

0 commit comments

Comments
 (0)