Skip to content

Commit 266643f

Browse files
author
du
authored
[Flink] Fix flink cdc task write file wrong bucket number bug (lakesoul-io#584)
* fix_flink_write_file_bucket_bug Signed-off-by: fphantam <dongf@dmetasoul.com> * fix invalid hash_bucket_num Signed-off-by: fphantam <dongf@dmetasoul.com> * set lakesoul.sink.dynamic_bucketing default value is true Signed-off-by: fphantam <dongf@dmetasoul.com> * fix PartitioningAsyncWriter with aux_sort_cols Signed-off-by: fphantam <dongf@dmetasoul.com> --------- Signed-off-by: fphantam <dongf@dmetasoul.com>
1 parent cbdab7c commit 266643f

File tree

12 files changed

+42
-16
lines changed

12 files changed

+42
-16
lines changed

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public static void main(String[] args) throws Exception {
107107
conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
108108
conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
109109
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
110+
conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism);
110111
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
111112
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
112113
env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class);

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MysqlCdc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public static void main(String[] args) throws Exception {
8080
conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
8181
conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
8282
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
83+
conf.set(LakeSoulSinkOptions.HASH_BUCKET_NUM, bucketParallelism);
8384
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
8485

8586
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public static void main(String[] args) throws Exception {
7676
conf.set(LakeSoulSinkOptions.isMultiTableSource, true);
7777
conf.set(SOURCE_PARALLELISM, sourceParallelism);
7878
conf.set(BUCKET_PARALLELISM, bucketParallelism);
79+
conf.set(HASH_BUCKET_NUM, bucketParallelism);
7980
conf.set(SERVER_TIME_ZONE, serverTimezone);
8081
conf.set(WAREHOUSE_PATH, databasePrefixPath);
8182
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
307307
}
308308
} else {
309309
// for non-primary key table, hashBucketNum properties should not be set
310-
if (tableOptions.containsKey(HASH_BUCKET_NUM.key())) {
310+
if (tableOptions.containsKey(HASH_BUCKET_NUM.key()) && !tableOptions.get(HASH_BUCKET_NUM.key()).equals("-1")) {
311311
throw new CatalogException("hashBucketNum property should not be set for table without primary key");
312312
}
313313
}

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public DataStream<BinarySourceRecord> buildHashPartitionedCDCStream(DataStream<B
5555
}
5656

5757
public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<BinarySourceRecord> stream) {
58-
context.conf.set(DYNAMIC_BUCKETING, false);
58+
Boolean dynamicBucketing = context.conf.get(DYNAMIC_BUCKETING);
5959
if (!context.conf.contains(AUTO_SCHEMA_CHANGE)) {
6060
context.conf.set(AUTO_SCHEMA_CHANGE, true);
6161
}
@@ -71,8 +71,12 @@ public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<Binary
7171
.withRollingPolicy(rollingPolicy)
7272
.withOutputFileConfig(fileNameConfig)
7373
.build();
74-
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
75-
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
74+
if (dynamicBucketing) {
75+
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink");
76+
} else {
77+
return stream.sinkTo(sink).name("LakeSoul MultiTable DML Sink")
78+
.setParallelism(context.conf.getInteger(BUCKET_PARALLELISM));
79+
}
7680
}
7781

7882
public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context,

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ public DefaultOneTableBulkFormatBuilder(
3333
super(basePath, conf, new DefaultLakeSoulWriterBucketFactory(conf));
3434
this.identity = identity;
3535
}
36-
public TableSchemaIdentity getIdentity(){
36+
public TableSchemaIdentity getIdentity() {
3737
return this.identity;
3838
}
3939

4040
@Override
4141
public AbstractLakeSoulMultiTableSinkWriter<RowData, RowData> createWriter(Sink.InitContext context, int subTaskId) throws
4242
IOException {
4343
int hashBucketNum = conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM);
44-
int hashBucketId = hashBucketNum == -1 ? subTaskId : subTaskId % hashBucketNum;
44+
int hashBucketId = hashBucketNum == -1 ? 0 : subTaskId % hashBucketNum;
4545
System.out.printf("DefaultOneTableBulkFormatBuilder::createWriter, subTaskId=%d, hashBucketId=%d\n", subTaskId, hashBucketId);
4646
return new LakeSoulRowDataOneTableSinkWriter(
4747
hashBucketId,

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
153153
identity.tableLocation, msgSchema, identity.useCDC, identity.cdcColumn, partition);
154154
JSONObject properties = new JSONObject();
155155
if (!identity.primaryKeys.isEmpty()) {
156-
properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(BUCKET_PARALLELISM)));
156+
properties.put(HASH_BUCKET_NUM.key(), Integer.toString(conf.getInteger(HASH_BUCKET_NUM)));
157157
properties.put(HASH_PARTITIONS,
158158
String.join(LAKESOUL_HASH_PARTITION_SPLITTER, identity.primaryKeys));
159159
if (isCdc) {

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,12 @@ public static String getDatabaseName(String fullDatabaseName) {
385385
}
386386

387387
public static void setIOConfigs(Configuration conf, NativeIOBase io) {
388-
conf.addAll(GlobalConfiguration.loadConfiguration());
388+
Configuration globalConf = GlobalConfiguration.loadConfiguration();
389+
globalConf.keySet().forEach(key -> {
390+
if (!conf.containsKey(key)) {
391+
conf.setString(key, globalConf.getString(key, null));
392+
}
393+
});
389394
try {
390395
FlinkUtil.class.getClassLoader().loadClass("org.apache.hadoop.hdfs.HdfsConfiguration");
391396
org.apache.hadoop.conf.Configuration

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/NativeOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class NativeOptions {
1515
.withDescription("Option to set memory limit of native writer");
1616

1717
public static final ConfigOption<String> HASH_BUCKET_ID =
18-
key("lakesoul.native_writer.hash_bucket_id")
18+
key("hash_bucket_id")
1919
.stringType()
2020
.defaultValue("0")
2121
.withDescription("Option to set hash bucket id of native writer");

native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public void setAuxSortColumns(Iterable<String> auxSortColumns) {
7373
}
7474

7575
public void setHashBucketNum(Integer hashBucketNum) {
76+
hashBucketNum = hashBucketNum < 1 ? 1 : hashBucketNum;
7677
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_hash_bucket_num(ioConfigBuilder, hashBucketNum);
7778
}
7879

0 commit comments

Comments
 (0)