diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index f5a37300c6f9..efba21b8d9dc 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -90,6 +90,11 @@ + +
--eager_init
+ It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot. + +
--partition_keys
The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm". diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index c107500eba86..be63b5a43fff 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -69,6 +69,7 @@ public class CdcActionCommonUtils { public static final String COMPUTED_COLUMN = "computed_column"; public static final String METADATA_COLUMN = "metadata_column"; public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys"; + public static final String EAGER_INIT = "eager_init"; public static void assertSchemaCompatible( TableSchema paimonSchema, List sourceTableFields) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index d876fe484b50..267d682ca698 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.MultiTablesSinkMode; +import org.apache.paimon.flink.sink.TableFilter; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder; import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder; @@ -48,6 +49,7 @@ /** Base {@link Action} for synchronizing into one Paimon database. */ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { + protected boolean eagerInit = false; protected boolean mergeShards = true; protected MultiTablesSinkMode mode = COMBINED; protected String tablePrefix = ""; @@ -81,6 +83,11 @@ public SyncDatabaseActionBase mergeShards(boolean mergeShards) { return this; } + public SyncDatabaseActionBase eagerInit(boolean eagerInit) { + this.eagerInit = eagerInit; + return this; + } + public SyncDatabaseActionBase withMode(MultiTablesSinkMode mode) { this.mode = mode; return this; @@ -227,6 +234,13 @@ protected EventParser.Factory buildEventParserFactory() protected void buildSink( DataStream input, EventParser.Factory parserFactory) { + + List whiteList = new ArrayList<>(tableMapping.values()); + List prefixList = new ArrayList<>(dbPrefix.values()); + prefixList.add(tablePrefix); + List suffixList = new ArrayList<>(dbSuffix.values()); + suffixList.add(tableSuffix); + new FlinkCdcSyncDatabaseSinkBuilder() .withInput(input) .withParserFactory(parserFactory) @@ -236,6 +250,15 @@ protected void buildSink( .withTables(tables) .withMode(mode) .withTableOptions(tableConfig) + .withEagerInit(eagerInit) + .withTableFilter( + new TableFilter( + database, + whiteList, + prefixList, + suffixList, + includingTables, + excludingTables)) .build(); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index 4c7bd9cc1b88..52bfb7271c8a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -24,6 +24,7 @@ import java.util.Optional; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EAGER_INIT; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS; @@ -63,7 +64,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) { .excludingDbs(params.get(EXCLUDING_DBS)) .withPartitionKeyMultiple( optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS)) - .withPartitionKeys(); + .withPartitionKeys() + .eagerInit(Boolean.valueOf(params.get(EAGER_INIT))); if (params.has(PARTITION_KEYS)) { action.withPartitionKeys(params.get(PARTITION_KEYS).split(",")); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index a6c1eb2373f2..a825634a0925 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -31,6 +31,7 @@ import org.apache.paimon.flink.sink.StoreMultiCommitter; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.StoreSinkWriteImpl; +import org.apache.paimon.flink.sink.TableFilter; import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.MemorySize; @@ -46,6 +47,7 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Collections; import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration; import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter; @@ -65,16 +67,22 @@ public class FlinkCdcMultiTableSink implements Serializable { private final double commitCpuCores; @Nullable private final MemorySize commitHeapMemory; private final String commitUser; + private boolean eagerInit = false; + private TableFilter tableFilter; public FlinkCdcMultiTableSink( CatalogLoader catalogLoader, double commitCpuCores, @Nullable MemorySize commitHeapMemory, - String commitUser) { + String commitUser, + boolean eagerInit, + TableFilter tableFilter) { this.catalogLoader = catalogLoader; this.commitCpuCores = commitCpuCores; this.commitHeapMemory = commitHeapMemory; this.commitUser = commitUser; + this.eagerInit = eagerInit; + this.tableFilter = tableFilter; } private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() { @@ -128,7 +136,7 @@ public DataStreamSink sinkFrom( true, false, commitUser, - createCommitterFactory(), + createCommitterFactory(tableFilter), createCommittableStateManager())); forwardParallelism(committed, input); configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory); @@ -144,12 +152,20 @@ public DataStreamSink sinkFrom( // Table committers are dynamically created at runtime protected Committer.Factory - createCommitterFactory() { + createCommitterFactory(TableFilter tableFilter) { + // If checkpoint is enabled for streaming job, we have to // commit new files list even if they're empty. // Otherwise we can't tell if the commit is successful after // a restart. - return context -> new StoreMultiCommitter(catalogLoader, context); + return context -> + new StoreMultiCommitter( + catalogLoader, + context, + false, + Collections.emptyMap(), + eagerInit, + tableFilter); } protected CommittableStateManager createCommittableStateManager() { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 1d25929716e4..b15a17a679b2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -24,6 +24,7 @@ import org.apache.paimon.flink.action.MultiTablesSinkMode; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.sink.FlinkWriteSink; +import org.apache.paimon.flink.sink.TableFilter; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -79,8 +80,10 @@ public class FlinkCdcSyncDatabaseSinkBuilder { // database to sync, currently only support single database private String database; + private boolean eagerInit; private MultiTablesSinkMode mode; private String commitUser; + private TableFilter tableFilter; public FlinkCdcSyncDatabaseSinkBuilder withInput(DataStream input) { this.input = input; @@ -125,6 +128,16 @@ public FlinkCdcSyncDatabaseSinkBuilder withMode(MultiTablesSinkMode mode) { return this; } + public FlinkCdcSyncDatabaseSinkBuilder withEagerInit(boolean eagerInit) { + this.eagerInit = eagerInit; + return this; + } + + public FlinkCdcSyncDatabaseSinkBuilder withTableFilter(TableFilter tableFilter) { + this.tableFilter = tableFilter; + return this; + } + public FlinkCdcSyncDatabaseSinkBuilder withTypeMapping(TypeMapping typeMapping) { this.typeMapping = typeMapping; return this; @@ -176,7 +189,12 @@ private void buildCombinedCdcSink() { FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink( - catalogLoader, committerCpu, committerMemory, commitUser); + catalogLoader, + committerCpu, + committerMemory, + commitUser, + eagerInit, + tableFilter); sink.sinkFrom(partitioned); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java index ab81e37c7d04..2371fbfb9dfe 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java @@ -52,7 +52,9 @@ public void testTransformationParallelism() { () -> FlinkCatalogFactory.createPaimonCatalog(new Options()), FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(), null, - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), + false, + null); DataStreamSink dataStreamSink = sink.sinkFrom(input); // check the transformation graph diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 01acddb9ad99..67b2b6bd4627 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -57,6 +57,8 @@ public class StoreMultiCommitter private final boolean ignoreEmptyCommit; private final Map dynamicOptions; + private final TableFilter tableFilter; + public StoreMultiCommitter(CatalogLoader catalogLoader, Context context) { this(catalogLoader, context, false, Collections.emptyMap()); } @@ -66,11 +68,28 @@ public StoreMultiCommitter( Context context, boolean ignoreEmptyCommit, Map dynamicOptions) { + this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, false, null); + } + + public StoreMultiCommitter( + CatalogLoader catalogLoader, + Context context, + boolean ignoreEmptyCommit, + Map dynamicOptions, + boolean eagerInit, + TableFilter tableFilter) { this.catalog = catalogLoader.load(); this.context = context; this.ignoreEmptyCommit = ignoreEmptyCommit; this.dynamicOptions = dynamicOptions; this.tableCommitters = new HashMap<>(); + + this.tableFilter = tableFilter; + + if (eagerInit) { + List tableIds = filterTables(); + tableIds.stream().forEach(this::getStoreCommitter); + } } @Override @@ -218,4 +237,19 @@ public void close() throws Exception { catalog.close(); } } + + private List filterTables() { + // Get all tables in the catalog + List allTables = null; + try { + allTables = catalog.listTables(this.tableFilter.getDbName()); + } catch (Catalog.DatabaseNotExistException e) { + allTables = Collections.emptyList(); + } + + List tblList = tableFilter.filterTables(allTables); + return tblList.stream() + .map(t -> Identifier.create(tableFilter.getDbName(), t)) + .collect(Collectors.toList()); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableFilter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableFilter.java new file mode 100644 index 000000000000..cfe04957924e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableFilter.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** TableFilter is used to filter tables according to whitelist and prefix/suffix patterns. */ +public class TableFilter implements java.io.Serializable { + private String dbName; + private List tableWhitelist; + private List tablePrefixes; + private List tableSuffixes; + private String tblIncludingPattern = ".*"; + private String tblExcludingPattern = ""; + + public TableFilter( + String dbName, + List tableWhitelist, + List tablePrefixes, + List tableSuffixes, + String tblIncludingPattern, + String tblExcludingPattern) { + this.dbName = dbName; + this.tableWhitelist = tableWhitelist; + this.tablePrefixes = tablePrefixes; + this.tableSuffixes = tableSuffixes; + this.tblIncludingPattern = tblIncludingPattern; + this.tblExcludingPattern = tblExcludingPattern; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public List getTableWhitelist() { + return tableWhitelist; + } + + public void setTableWhitelist(List tableWhitelist) { + this.tableWhitelist = tableWhitelist; + } + + public List getTablePrefixes() { + return tablePrefixes; + } + + public void setTablePrefixes(List tablePrefixes) { + this.tablePrefixes = tablePrefixes; + } + + public List getTableSuffixes() { + return tableSuffixes; + } + + public void setTableSuffixes(List tableSuffixes) { + this.tableSuffixes = tableSuffixes; + } + + public String getTblIncludingPattern() { + return tblIncludingPattern; + } + + public void setTblIncludingPattern(String tblIncludingPattern) { + this.tblIncludingPattern = tblIncludingPattern; + } + + public String getTblExcludingPattern() { + return tblExcludingPattern; + } + + public void setTblExcludingPattern(String tblExcludingPattern) { + this.tblExcludingPattern = tblExcludingPattern; + } + + public List filterTables(List allTables) { + List inPatternList = Arrays.asList(tblIncludingPattern.split("\\|")); + List exPatternList = + (tblExcludingPattern == null || tblExcludingPattern.isEmpty()) + ? Collections.emptyList() + : Arrays.asList(tblExcludingPattern.split("\\|")); + String inPattern = + inPatternList.stream() + .flatMap( + p -> + tablePrefixes.isEmpty() + ? Stream.of(p) + : tablePrefixes.stream().map(prefix -> prefix + p)) + .flatMap( + p -> + tableSuffixes.isEmpty() + ? Stream.of(p) + : tableSuffixes.stream().map(suffix -> p + suffix)) + .collect(Collectors.joining("|")); + + String exPattern = + exPatternList.isEmpty() + ? "" + : exPatternList.stream() + .flatMap( + p -> + tablePrefixes.isEmpty() + ? Stream.of(p) + : tablePrefixes.stream() + .map(prefix -> prefix + p)) + .flatMap( + p -> + tableSuffixes.isEmpty() + ? Stream.of(p) + : tableSuffixes.stream() + .map(suffix -> p + suffix)) + .collect(Collectors.joining("|")); + + return allTables.stream() + .filter(t -> exPattern.isEmpty() || !t.matches(exPattern)) + .filter(t -> tableWhitelist.contains(t) || t.matches(inPattern)) + .collect(Collectors.toList()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 1958d15a3fa5..f6cea7f5fb25 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -82,6 +82,7 @@ class StoreMultiCommitterTest { private Path warehouse; private CatalogLoader catalogLoader; private Catalog catalog; + private String databaseName; private Identifier firstTable; private Identifier secondTable; private Path firstTablePath; @@ -100,7 +101,7 @@ private final void createTestTables(Catalog catalog, Tuple2. public void beforeEach() throws Exception { initialCommitUser = UUID.randomUUID().toString(); warehouse = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString()); - String databaseName = "test_db"; + databaseName = "test_db"; firstTable = Identifier.create(databaseName, "test_table1"); secondTable = Identifier.create(databaseName, "test_table2"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/TableFilterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/TableFilterTest.java new file mode 100644 index 000000000000..10d6834156cc --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/TableFilterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for TableFilter. */ +public class TableFilterTest { + + @Test + public void testFilterTables() { + List tableWhitelist = Collections.singletonList("white_listed_table"); + List tablePrefixes = Collections.singletonList("test_"); + List tableSuffixes = Collections.EMPTY_LIST; + String tblIncludingPattern = ".*"; + String tblExcludingPattern = ""; + + // prefix, no suffix, no whitelisting, + List allTables = + Arrays.asList( + "test_table1", + "test_table2", + "test_table1_suffix_in", + "test_table2_suffix_in", + "test_table1_suffix_ex", + "test_table2_suffix_ex", + "test_excluded1", + "test_excluded2", + "white_listed_table", + "other_table1", + "other_table2"); + + TableFilter tableFilter = + new TableFilter( + "", + tableWhitelist, + tablePrefixes, + tableSuffixes, + tblIncludingPattern, + tblExcludingPattern); + + List filteredTables = tableFilter.filterTables(allTables); + assertThat(filteredTables.size()).isEqualTo(9); + assertThat(filteredTables) + .contains( + "test_table1", + "test_table2", + "test_table1_suffix_in", + "test_table2_suffix_in", + "test_table1_suffix_ex", + "test_table2_suffix_ex", + "test_excluded1", + "test_excluded2", + "white_listed_table"); + + // exclude pattern + tblExcludingPattern = "excluded.*"; + tableFilter.setTblExcludingPattern(tblExcludingPattern); + filteredTables = tableFilter.filterTables(allTables); + assertThat(filteredTables.size()).isEqualTo(7); + assertThat(filteredTables) + .contains( + "test_table1", + "test_table2", + "test_table1_suffix_in", + "test_table2_suffix_in", + "test_table1_suffix_ex", + "test_table2_suffix_ex", + "white_listed_table"); + + // suffix + tableSuffixes = Collections.singletonList("_suffix_in"); + tableFilter.setTableSuffixes(tableSuffixes); + filteredTables = tableFilter.filterTables(allTables); + assertThat(filteredTables.size()).isEqualTo(3); + assertThat(filteredTables) + .contains("test_table1_suffix_in", "test_table2_suffix_in", "white_listed_table"); + + // No tables + filteredTables = tableFilter.filterTables(Collections.emptyList()); + assertThat(filteredTables.size()).isEqualTo(0); + } +}