Skip to content

Commit 5db5b9d

Browse files
committed
Merge remote-tracking branch 'origin/hotfix_1.8_3.10.x_30426' into tmp_1.8_3.10.x_merge
# Conflicts: # flinkx-kudu/flinkx-kudu-writer/src/main/java/com/dtstack/flinkx/kudu/writer/KuduWriter.java
2 parents 0dd1203 + d45491c commit 5db5b9d

File tree

8 files changed

+24
-5
lines changed
  • flinkx-emqx/flinkx-emqx-writer/src/main/java/com/dtstack/flinkx/emqx/writer
  • flinkx-kafka09/flinkx-kafka09-writer/src/main/java/com/dtstack/flinkx/kafka09/writer
  • flinkx-kafka10/flinkx-kafka10-writer/src/main/java/com/dtstack/flinkx/kafka10/writer
  • flinkx-kafka11/flinkx-kafka11-writer/src/main/java/com/dtstack/flinkx/kafka11/writer
  • flinkx-kafka/flinkx-kafka-writer/src/main/java/com/dtstack/flinkx/kafka/writer
  • flinkx-kb/flinkx-kb-writer/src/main/java/com/dtstack/flinkx/kafkabase/writer
  • flinkx-kudu/flinkx-kudu-writer/src/main/java/com/dtstack/flinkx/kudu/writer
  • flinkx-restapi/flinkx-restapi-writer/src/main/java/com/dtstack/flinkx/restapi/writer

8 files changed

+24
-5
lines changed

flinkx-emqx/flinkx-emqx-writer/src/main/java/com/dtstack/flinkx/emqx/writer/EmqxWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
6666
builder.setPassword(password);
6767
builder.setCleanSession(isCleanSession);
6868
builder.setQos(qos);
69+
builder.setDirtyPath(dirtyPath);
70+
builder.setDirtyHadoopConfig(dirtyHadoopConfig);
71+
builder.setSrcCols(srcCols);
6972
return createOutput(dataSet, builder.finish());
7073
}
7174
}

flinkx-kafka/flinkx-kafka-writer/src/main/java/com/dtstack/flinkx/kafka/writer/KafkaWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
4747
format.setProducerSettings(producerSettings);
4848
format.setRestoreConfig(restoreConfig);
4949
format.setTableFields(tableFields);
50-
50+
format.setDirtyPath(dirtyPath);
51+
format.setDirtyHadoopConfig(dirtyHadoopConfig);
52+
format.setSrcFieldNames(srcCols);
5153
return createOutput(dataSet, format);
5254
}
5355
}

flinkx-kafka09/flinkx-kafka09-writer/src/main/java/com/dtstack/flinkx/kafka09/writer/Kafka09Writer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
5858
format.setBrokerList(brokerList);
5959
format.setProducerSettings(producerSettings);
6060
format.setRestoreConfig(restoreConfig);
61-
61+
format.setDirtyPath(dirtyPath);
62+
format.setDirtyHadoopConfig(dirtyHadoopConfig);
63+
format.setSrcFieldNames(srcCols);
6264
return createOutput(dataSet, format);
6365
}
6466
}

flinkx-kafka10/flinkx-kafka10-writer/src/main/java/com/dtstack/flinkx/kafka10/writer/Kafka10Writer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
4646
format.setTableFields(tableFields);
4747
format.setProducerSettings(producerSettings);
4848
format.setRestoreConfig(restoreConfig);
49-
49+
format.setDirtyPath(dirtyPath);
50+
format.setDirtyHadoopConfig(dirtyHadoopConfig);
51+
format.setSrcFieldNames(srcCols);
5052
return createOutput(dataSet, format);
5153
}
5254
}

flinkx-kafka11/flinkx-kafka11-writer/src/main/java/com/dtstack/flinkx/kafka11/writer/Kafka11Writer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
4646
format.setProducerSettings(producerSettings);
4747
format.setRestoreConfig(restoreConfig);
4848
format.setTableFields(tableFields);
49-
49+
format.setDirtyPath(dirtyPath);
50+
format.setDirtyHadoopConfig(dirtyHadoopConfig);
51+
format.setSrcFieldNames(srcCols);
5052
return createOutput(dataSet, format);
5153
}
5254
}

flinkx-kb/flinkx-kb-writer/src/main/java/com/dtstack/flinkx/kafkabase/writer/KafkaBaseWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
6262
format.setProducerSettings(producerSettings);
6363
format.setRestoreConfig(restoreConfig);
6464
format.setTableFields(tableFields);
65-
65+
format.setDirtyPath(dirtyPath);
66+
format.setDirtyHadoopConfig(dirtyHadoopConfig);
67+
format.setSrcFieldNames(srcCols);
6668
return createOutput(dataSet, format);
6769
}
6870
}

flinkx-kudu/flinkx-kudu-writer/src/main/java/com/dtstack/flinkx/kudu/writer/KuduWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
9595
builder.setErrors(errors);
9696
builder.setErrorRatio(errorRatio);
9797
builder.setHadoopConfig(hadoopConfig);
98+
builder.setDirtyPath(dirtyPath);
99+
builder.setDirtyHadoopConfig(dirtyHadoopConfig);
100+
builder.setSrcCols(srcCols);
98101
return createOutput(dataSet,builder.finish());
99102
}
100103
}

flinkx-restapi/flinkx-restapi-writer/src/main/java/com/dtstack/flinkx/restapi/writer/RestapiWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
9797
builder.setColumn(column);
9898
builder.setParams(params);
9999
builder.setBatchInterval(batchInterval);
100+
builder.setDirtyPath(dirtyPath);
101+
builder.setDirtyHadoopConfig(dirtyHadoopConfig);
102+
builder.setSrcCols(srcCols);
100103

101104
return createOutput(dataSet, builder.finish());
102105
}

0 commit comments

Comments
 (0)