Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
</ul>
</td>
</tr>
<tr>
<td><h5>--eager_init</h5></td>
<td>It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot.</td>
</tr>
<tr>
<tr>
<td><h5>--partition_keys</h5></td>
<td>The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class CdcActionCommonUtils {
public static final String COMPUTED_COLUMN = "computed_column";
public static final String METADATA_COLUMN = "metadata_column";
public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys";
public static final String EAGER_INIT = "eager_init";

public static void assertSchemaCompatible(
TableSchema paimonSchema, List<DataField> sourceTableFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.TableFilter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
Expand Down Expand Up @@ -48,6 +49,7 @@
/** Base {@link Action} for synchronizing into one Paimon database. */
public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {

protected boolean eagerInit = false;
protected boolean mergeShards = true;
protected MultiTablesSinkMode mode = COMBINED;
protected String tablePrefix = "";
Expand Down Expand Up @@ -81,6 +83,11 @@ public SyncDatabaseActionBase mergeShards(boolean mergeShards) {
return this;
}

public SyncDatabaseActionBase eagerInit(boolean eagerInit) {
this.eagerInit = eagerInit;
return this;
}

public SyncDatabaseActionBase withMode(MultiTablesSinkMode mode) {
this.mode = mode;
return this;
Expand Down Expand Up @@ -227,6 +234,13 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
protected void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {

List<String> whiteList = new ArrayList<>(tableMapping.values());
List<String> prefixList = new ArrayList<>(dbPrefix.values());
prefixList.add(tablePrefix);
List<String> suffixList = new ArrayList<>(dbSuffix.values());
suffixList.add(tableSuffix);

new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(input)
.withParserFactory(parserFactory)
Expand All @@ -236,6 +250,15 @@ protected void buildSink(
.withTables(tables)
.withMode(mode)
.withTableOptions(tableConfig)
.withEagerInit(eagerInit)
.withTableFilter(
new TableFilter(
database,
whiteList,
prefixList,
suffixList,
includingTables,
excludingTables))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EAGER_INIT;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
Expand Down Expand Up @@ -63,7 +64,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
.excludingDbs(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
.withPartitionKeys()
.eagerInit(Boolean.valueOf(params.get(EAGER_INIT)));

if (params.has(PARTITION_KEYS)) {
action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.TableFilter;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
Expand All @@ -46,6 +47,7 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Collections;

import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
Expand All @@ -65,16 +67,22 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final String commitUser;
private boolean eagerInit = false;
private TableFilter tableFilter;

public FlinkCdcMultiTableSink(
CatalogLoader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
String commitUser) {
String commitUser,
boolean eagerInit,
TableFilter tableFilter) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitUser = commitUser;
this.eagerInit = eagerInit;
this.tableFilter = tableFilter;
}

private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
Expand Down Expand Up @@ -128,7 +136,7 @@ public DataStreamSink<?> sinkFrom(
true,
false,
commitUser,
createCommitterFactory(),
createCommitterFactory(tableFilter),
createCommittableStateManager()));
forwardParallelism(committed, input);
configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
Expand All @@ -144,12 +152,20 @@ public DataStreamSink<?> sinkFrom(

// Table committers are dynamically created at runtime
protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable>
createCommitterFactory() {
createCommitterFactory(TableFilter tableFilter) {

// If checkpoint is enabled for streaming job, we have to
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
return context -> new StoreMultiCommitter(catalogLoader, context);
return context ->
new StoreMultiCommitter(
catalogLoader,
context,
false,
Collections.emptyMap(),
eagerInit,
tableFilter);
}

protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.TableFilter;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -79,8 +80,10 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {

// database to sync, currently only support single database
private String database;
private boolean eagerInit;
private MultiTablesSinkMode mode;
private String commitUser;
private TableFilter tableFilter;

public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
Expand Down Expand Up @@ -125,6 +128,16 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MultiTablesSinkMode mode) {
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withEagerInit(boolean eagerInit) {
this.eagerInit = eagerInit;
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withTableFilter(TableFilter tableFilter) {
this.tableFilter = tableFilter;
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
return this;
Expand Down Expand Up @@ -176,7 +189,12 @@ private void buildCombinedCdcSink() {

FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
catalogLoader, committerCpu, committerMemory, commitUser);
catalogLoader,
committerCpu,
committerMemory,
commitUser,
eagerInit,
tableFilter);
sink.sinkFrom(partitioned);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public void testTransformationParallelism() {
() -> FlinkCatalogFactory.createPaimonCatalog(new Options()),
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null,
UUID.randomUUID().toString());
UUID.randomUUID().toString(),
false,
null);
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);

// check the transformation graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class StoreMultiCommitter
private final boolean ignoreEmptyCommit;
private final Map<String, String> dynamicOptions;

private final TableFilter tableFilter;

public StoreMultiCommitter(CatalogLoader catalogLoader, Context context) {
this(catalogLoader, context, false, Collections.emptyMap());
}
Expand All @@ -66,11 +68,28 @@ public StoreMultiCommitter(
Context context,
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions) {
this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, false, null);
}

public StoreMultiCommitter(
CatalogLoader catalogLoader,
Context context,
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions,
boolean eagerInit,
TableFilter tableFilter) {
this.catalog = catalogLoader.load();
this.context = context;
this.ignoreEmptyCommit = ignoreEmptyCommit;
this.dynamicOptions = dynamicOptions;
this.tableCommitters = new HashMap<>();

this.tableFilter = tableFilter;

if (eagerInit) {
List<Identifier> tableIds = filterTables();
tableIds.stream().forEach(this::getStoreCommitter);
}
}

@Override
Expand Down Expand Up @@ -218,4 +237,19 @@ public void close() throws Exception {
catalog.close();
}
}

private List<Identifier> filterTables() {
// Get all tables in the catalog
List<String> allTables = null;
try {
allTables = catalog.listTables(this.tableFilter.getDbName());
} catch (Catalog.DatabaseNotExistException e) {
allTables = Collections.emptyList();
}

List<String> tblList = tableFilter.filterTables(allTables);
return tblList.stream()
.map(t -> Identifier.create(tableFilter.getDbName(), t))
.collect(Collectors.toList());
}
}
Loading