Skip to content

Commit 5f25a85

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev
2 parents 0b5c515 + 736c42d commit 5f25a85

File tree

13 files changed

+83
-13
lines changed

13 files changed

+83
-13
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
6464
/** Partitioner to select Kafka partition for each item. */
6565
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
6666

67+
protected int parallelism;
68+
69+
6770

6871
@Override
6972
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
@@ -85,11 +88,16 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
8588
this.fieldTypes = types;
8689

8790
TableSchema.Builder schemaBuilder = TableSchema.builder();
88-
for (int i=0;i<fieldNames.length;i++){
89-
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
90-
}
91+
for (int i=0;i<fieldNames.length;i++) {
92+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
93+
}
9194
this.schema = schemaBuilder.build();
9295

96+
Integer parallelism = kafka09SinkTableInfo.getParallelism();
97+
if (parallelism != null) {
98+
this.parallelism = parallelism;
99+
}
100+
93101
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
94102
return this;
95103
}
@@ -111,7 +119,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
111119

112120
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
113121
return record.f1;
114-
}).returns(getOutputType().getTypeAt(1));
122+
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
115123

116124
kafkaTableSink.emitDataStream(ds);
117125
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka09SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka09SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
46+
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
47+
kafka09SinkTableInfo.setParallelism(parallelism);
48+
4549
for (String key : props.keySet()) {
4650
if (!key.isEmpty() && key.startsWith("kafka.")) {
4751
kafka09SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.commons.lang3.StringUtils;
3030
import org.apache.flink.api.common.typeinfo.TypeInformation;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
32+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3233
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3334
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
3435
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -114,6 +115,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
114115

115116
String fields = StringUtils.join(kafka09SourceTableInfo.getFields(), ",");
116117
String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName());
117-
return tableEnv.fromDataStream(env.addSource(kafkaSrc, sourceOperatorName, typeInformation), fields);
118+
119+
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
120+
Integer parallelism = kafka09SourceTableInfo.getParallelism();
121+
if (parallelism != null) {
122+
kafkaSource.setParallelism(parallelism);
123+
}
124+
return tableEnv.fromDataStream(kafkaSource, fields);
118125
}
119126
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5858

5959
protected Properties properties;
6060

61+
protected int parallelism;
62+
6163
/** Serialization schema for encoding records to Kafka. */
6264
protected SerializationSchema serializationSchema;
6365

@@ -87,12 +89,18 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
8789
}
8890
this.fieldTypes = types;
8991

92+
9093
TableSchema.Builder schemaBuilder = TableSchema.builder();
9194
for (int i=0;i<fieldNames.length;i++){
9295
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
9396
}
9497
this.schema = schemaBuilder.build();
9598

99+
Integer parallelism = kafka10SinkTableInfo.getParallelism();
100+
if (parallelism != null) {
101+
this.parallelism = parallelism;
102+
}
103+
96104
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
97105
return this;
98106
}
@@ -114,7 +122,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
114122

115123
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
116124
return record.f1;
117-
}).returns(getOutputType().getTypeAt(1));
125+
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
118126

119127
kafkaTableSink.emitDataStream(ds);
120128
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka10SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka10SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
46+
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
47+
kafka10SinkTableInfo.setParallelism(parallelism);
48+
4549
for (String key : props.keySet()) {
4650
if (!key.isEmpty() && key.startsWith("kafka.")) {
4751
kafka10SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.flink.api.common.typeinfo.TypeInformation;
3030
import org.apache.flink.api.java.typeutils.RowTypeInfo;
31+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3132
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3233
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
3334
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -116,6 +117,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
116117

117118
String fields = StringUtils.join(kafka010SourceTableInfo.getFields(), ",");
118119
String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName());
119-
return tableEnv.fromDataStream(env.addSource(kafkaSrc, sourceOperatorName, typeInformation), fields);
120+
121+
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
122+
Integer parallelism = kafka010SourceTableInfo.getParallelism();
123+
if (parallelism != null) {
124+
kafkaSource.setParallelism(parallelism);
125+
}
126+
return tableEnv.fromDataStream(kafkaSource, fields);
120127
}
121128
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener
5656

5757
protected String topic;
5858

59+
protected int parallelism;
60+
5961
protected Properties properties;
6062

6163
/** Serialization schema for encoding records to Kafka. */
@@ -88,10 +90,16 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
8890
this.fieldTypes = types;
8991

9092
TableSchema.Builder schemaBuilder = TableSchema.builder();
91-
for (int i=0;i<fieldNames.length;i++){
93+
for (int i=0;i<fieldNames.length;i++) {
9294
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
9395
}
9496
this.schema = schemaBuilder.build();
97+
98+
Integer parallelism = kafka11SinkTableInfo.getParallelism();
99+
if (parallelism != null) {
100+
this.parallelism = parallelism;
101+
}
102+
95103
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
96104
return this;
97105
}
@@ -110,9 +118,10 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
110118
partitioner,
111119
serializationSchema
112120
);
121+
113122
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
114123
return record.f1;
115-
}).returns(getOutputType().getTypeAt(1));
124+
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
116125

117126
kafkaTableSink.emitDataStream(ds);
118127
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
5050

5151
kafka11SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
5252
kafka11SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
53+
54+
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
55+
kafka11SinkTableInfo.setParallelism(parallelism);
56+
5357
for (String key : props.keySet()) {
5458
if (!key.isEmpty() && key.startsWith("kafka.")) {
5559
kafka11SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.flink.api.common.typeinfo.TypeInformation;
3030
import org.apache.flink.api.java.typeutils.RowTypeInfo;
31+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3132
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3233
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
3334
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -116,6 +117,12 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
116117

117118
String fields = StringUtils.join(kafka011SourceTableInfo.getFields(), ",");
118119
String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName());
119-
return tableEnv.fromDataStream(env.addSource(kafkaSrc, sourceOperatorName, typeInformation), fields);
120+
121+
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
122+
Integer parallelism = kafka011SourceTableInfo.getParallelism();
123+
if (parallelism != null) {
124+
kafkaSource.setParallelism(parallelism);
125+
}
126+
return tableEnv.fromDataStream(kafkaSource, fields);
120127
}
121128
}

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,12 @@ public void open(Configuration parameters) throws Exception {
6969
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
7070
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
7171

72-
72+
System.setProperty("vertx.disableFileCPResolving", "true");
7373

7474
VertxOptions vo = new VertxOptions();
7575
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
7676
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
77+
vo.setFileResolverCachingEnabled(false);
7778
Vertx vertx = Vertx.vertx(vo);
7879
setRdbSQLClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
7980
}

0 commit comments

Comments
 (0)