diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java index 0280e9badf1e..fe283fe8f417 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java @@ -48,6 +48,7 @@ public class CloneAction extends ActionBase { @Nullable private final List excludedTables; @Nullable private final String preferFileFormat; private final String cloneFrom; + private final boolean metaOnly; public CloneAction( String sourceDatabase, @@ -61,7 +62,8 @@ public CloneAction( @Nullable List includedTables, @Nullable List excludedTables, @Nullable String preferFileFormat, - String cloneFrom) { + String cloneFrom, + boolean metaOnly) { super(sourceCatalogConfig); if (cloneFrom.equalsIgnoreCase("hive")) { @@ -94,6 +96,7 @@ public CloneAction( ? preferFileFormat : preferFileFormat.toLowerCase(); this.cloneFrom = cloneFrom; + this.metaOnly = metaOnly; } @Override @@ -113,7 +116,8 @@ public void build() throws Exception { whereSql, includedTables, excludedTables, - preferFileFormat); + preferFileFormat, + metaOnly); break; case "paimon": ClonePaimonTableUtils.build( @@ -129,7 +133,8 @@ public void build() throws Exception { whereSql, includedTables, excludedTables, - preferFileFormat); + preferFileFormat, + metaOnly); break; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java index c0258ec70f15..5d884bd2d97b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java @@ -40,6 +40,7 @@ public class CloneActionFactory implements ActionFactory { private static final String EXCLUDED_TABLES = "excluded_tables"; private static final String PREFER_FILE_FORMAT = "prefer_file_format"; private static final String CLONE_FROM = "clone_from"; + private static final String META_ONLY = "meta_only"; @Override public String identifier() { @@ -76,6 +77,12 @@ public Optional create(MultipleParameterToolAdapter params) { cloneFrom = "hive"; } String preferFileFormat = params.get(PREFER_FILE_FORMAT); + + String metaOnlyStr = params.get(META_ONLY); + boolean metaOnly = + !StringUtils.isNullOrWhitespaceOnly(metaOnlyStr) + && Boolean.parseBoolean(metaOnlyStr); + CloneAction cloneAction = new CloneAction( params.get(DATABASE), @@ -89,7 +96,8 @@ public Optional create(MultipleParameterToolAdapter params) { includedTables, excludedTables, preferFileFormat, - cloneFrom); + cloneFrom, + metaOnly); return Optional.of(cloneAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java index c32c9b53c548..b6298404a353 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java @@ -156,7 +156,8 @@ public static void build( @Nullable String whereSql, @Nullable List includedTables, @Nullable List excludedTables, - @Nullable String preferFileFormat) + @Nullable String preferFileFormat, + boolean metaOnly) throws Exception { // list source tables DataStream> source = @@ -183,6 +184,12 @@ public static void build( .name("Clone Schema") .setParallelism(parallelism); + // if metaOnly is true, only clone schema and skip data cloning + if (metaOnly) { + schemaInfos.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); + return; + } + buildForCloneSplits( sourceCatalogConfig, targetCatalogConfig, parallelism, whereSql, schemaInfos); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java index a4984e69ed6f..1bce445a4587 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java @@ -138,7 +138,8 @@ public static void build( @Nullable String whereSql, @Nullable List includedTables, @Nullable List excludedTables, - @Nullable String preferFileFormat) + @Nullable String preferFileFormat, + boolean metaOnly) throws Exception { // list source tables DataStream> source = @@ -165,6 +166,12 @@ public static void build( .name("Clone Schema") .setParallelism(parallelism); + // if metaOnly is true, only clone schema and skip data cloning + if (metaOnly) { + schemaInfos.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); + return; + } + // list splits DataStream splits = schemaInfos diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java index 1f60707c7c1a..4daeb2fef7a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java @@ -73,6 +73,10 @@ public class CloneProcedure extends ProcedureBase { @ArgumentHint( name = "clone_from", type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "meta_only", + type = @DataTypeHint("BOOLEAN"), isOptional = true) }) public String[] call( @@ -88,7 +92,8 @@ public String[] call( String includedTablesStr, String excludedTablesStr, String preferFileFormat, - String cloneFrom) + String cloneFrom, + Boolean metaOnly) throws Exception { Map sourceCatalogConfig = new HashMap<>(optionalConfigMap(sourceCatalogConfigStr)); @@ -118,7 +123,8 @@ public String[] call( includedTables, excludedTables, preferFileFormat, - cloneFrom); + cloneFrom, + metaOnly != null && metaOnly); return execute(procedureContext, action, "Clone Job"); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java index 01640c6623d2..943732f25668 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java @@ -1012,6 +1012,64 @@ public void testMigrateWithPreferFileFormat() throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } + @Test + public void testMigrateWithMetaOnly() throws Exception { + String format = "avro"; + String dbName = "hivedb" + StringUtils.randomNumericString(10); + String tableName = "hivetable" + StringUtils.randomNumericString(10); + + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); + tEnv.useCatalog("HIVE"); + tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + tEnv.executeSql("CREATE DATABASE " + dbName); + sql( + tEnv, + "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 INT)" + + "STORED AS %s ", + dbName, + tableName, + format); + sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100)); + + tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); + tEnv.useCatalog("PAIMON_GE"); + + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" + warehouse + "')"); + tEnv.useCatalog("PAIMON"); + tEnv.executeSql("CREATE DATABASE test"); + + List args = + new ArrayList<>( + Arrays.asList( + "clone", + "--database", + dbName, + "--table", + tableName, + "--catalog_conf", + "metastore=hive", + "--catalog_conf", + "uri=thrift://localhost:" + PORT, + "--target_database", + "test", + "--target_table", + "test_table", + "--target_catalog_conf", + "warehouse=" + warehouse, + "--meta_only", + "true")); + + createAction(CloneAction.class, args).run(); + FileStoreTable paimonTable = + paimonTable(tEnv, "PAIMON", Identifier.create("test", "test_table")); + // table exists but no data + assertThat(paimonTable.schema().fieldNames()).containsExactly("id", "id2", "id3"); + assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNull(); + } + private String[] ddls(String format) { // has primary key String ddl0 =