Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class CloneAction extends ActionBase {
@Nullable private final List<String> excludedTables;
@Nullable private final String preferFileFormat;
private final String cloneFrom;
private final boolean metaOnly;

public CloneAction(
String sourceDatabase,
Expand All @@ -61,7 +62,8 @@ public CloneAction(
@Nullable List<String> includedTables,
@Nullable List<String> excludedTables,
@Nullable String preferFileFormat,
String cloneFrom) {
String cloneFrom,
boolean metaOnly) {
super(sourceCatalogConfig);

if (cloneFrom.equalsIgnoreCase("hive")) {
Expand Down Expand Up @@ -94,6 +96,7 @@ public CloneAction(
? preferFileFormat
: preferFileFormat.toLowerCase();
this.cloneFrom = cloneFrom;
this.metaOnly = metaOnly;
}

@Override
Expand All @@ -113,7 +116,8 @@ public void build() throws Exception {
whereSql,
includedTables,
excludedTables,
preferFileFormat);
preferFileFormat,
metaOnly);
break;
case "paimon":
ClonePaimonTableUtils.build(
Expand All @@ -129,7 +133,8 @@ public void build() throws Exception {
whereSql,
includedTables,
excludedTables,
preferFileFormat);
preferFileFormat,
metaOnly);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CloneActionFactory implements ActionFactory {
private static final String EXCLUDED_TABLES = "excluded_tables";
private static final String PREFER_FILE_FORMAT = "prefer_file_format";
private static final String CLONE_FROM = "clone_from";
private static final String META_ONLY = "meta_only";

@Override
public String identifier() {
Expand Down Expand Up @@ -76,6 +77,12 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
cloneFrom = "hive";
}
String preferFileFormat = params.get(PREFER_FILE_FORMAT);

String metaOnlyStr = params.get(META_ONLY);
boolean metaOnly =
!StringUtils.isNullOrWhitespaceOnly(metaOnlyStr)
&& Boolean.parseBoolean(metaOnlyStr);

CloneAction cloneAction =
new CloneAction(
params.get(DATABASE),
Expand All @@ -89,7 +96,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
includedTables,
excludedTables,
preferFileFormat,
cloneFrom);
cloneFrom,
metaOnly);

return Optional.of(cloneAction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public static void build(
@Nullable String whereSql,
@Nullable List<String> includedTables,
@Nullable List<String> excludedTables,
@Nullable String preferFileFormat)
@Nullable String preferFileFormat,
boolean metaOnly)
throws Exception {
// list source tables
DataStream<Tuple2<Identifier, Identifier>> source =
Expand All @@ -183,6 +184,12 @@ public static void build(
.name("Clone Schema")
.setParallelism(parallelism);

// if metaOnly is true, only clone schema and skip data cloning
if (metaOnly) {
schemaInfos.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
return;
}

buildForCloneSplits(
sourceCatalogConfig, targetCatalogConfig, parallelism, whereSql, schemaInfos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public static void build(
@Nullable String whereSql,
@Nullable List<String> includedTables,
@Nullable List<String> excludedTables,
@Nullable String preferFileFormat)
@Nullable String preferFileFormat,
boolean metaOnly)
throws Exception {
// list source tables
DataStream<Tuple2<Identifier, Identifier>> source =
Expand All @@ -165,6 +166,12 @@ public static void build(
.name("Clone Schema")
.setParallelism(parallelism);

// if metaOnly is true, only clone schema and skip data cloning
if (metaOnly) {
schemaInfos.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
return;
}

// list splits
DataStream<CloneSplitInfo> splits =
schemaInfos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public class CloneProcedure extends ProcedureBase {
@ArgumentHint(
name = "clone_from",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "meta_only",
type = @DataTypeHint("BOOLEAN"),
isOptional = true)
})
public String[] call(
Expand All @@ -88,7 +92,8 @@ public String[] call(
String includedTablesStr,
String excludedTablesStr,
String preferFileFormat,
String cloneFrom)
String cloneFrom,
Boolean metaOnly)
throws Exception {
Map<String, String> sourceCatalogConfig =
new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
Expand Down Expand Up @@ -118,7 +123,8 @@ public String[] call(
includedTables,
excludedTables,
preferFileFormat,
cloneFrom);
cloneFrom,
metaOnly != null && metaOnly);
return execute(procedureContext, action, "Clone Job");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,64 @@ public void testMigrateWithPreferFileFormat() throws Exception {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}

@Test
public void testMigrateWithMetaOnly() throws Exception {
String format = "avro";
String dbName = "hivedb" + StringUtils.randomNumericString(10);
String tableName = "hivetable" + StringUtils.randomNumericString(10);

TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql("CREATE DATABASE " + dbName);
sql(
tEnv,
"CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 INT)"
+ "STORED AS %s ",
dbName,
tableName,
format);
sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));

tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");

tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" + warehouse + "')");
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE test");

List<String> args =
new ArrayList<>(
Arrays.asList(
"clone",
"--database",
dbName,
"--table",
tableName,
"--catalog_conf",
"metastore=hive",
"--catalog_conf",
"uri=thrift://localhost:" + PORT,
"--target_database",
"test",
"--target_table",
"test_table",
"--target_catalog_conf",
"warehouse=" + warehouse,
"--meta_only",
"true"));

createAction(CloneAction.class, args).run();
FileStoreTable paimonTable =
paimonTable(tEnv, "PAIMON", Identifier.create("test", "test_table"));
// table exists but no data
assertThat(paimonTable.schema().fieldNames()).containsExactly("id", "id2", "id3");
assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNull();
}

private String[] ddls(String format) {
// has primary key
String ddl0 =
Expand Down
Loading