Skip to content

Commit c455db2

Browse files
authored
[clone] support meta only in clone action (apache#6967)
1 parent 6c1a0c5 commit c455db2

File tree

6 files changed

+99
-8
lines changed

6 files changed

+99
-8
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class CloneAction extends ActionBase {
4848
@Nullable private final List<String> excludedTables;
4949
@Nullable private final String preferFileFormat;
5050
private final String cloneFrom;
51+
private final boolean metaOnly;
5152

5253
public CloneAction(
5354
String sourceDatabase,
@@ -61,7 +62,8 @@ public CloneAction(
6162
@Nullable List<String> includedTables,
6263
@Nullable List<String> excludedTables,
6364
@Nullable String preferFileFormat,
64-
String cloneFrom) {
65+
String cloneFrom,
66+
boolean metaOnly) {
6567
super(sourceCatalogConfig);
6668

6769
if (cloneFrom.equalsIgnoreCase("hive")) {
@@ -94,6 +96,7 @@ public CloneAction(
9496
? preferFileFormat
9597
: preferFileFormat.toLowerCase();
9698
this.cloneFrom = cloneFrom;
99+
this.metaOnly = metaOnly;
97100
}
98101

99102
@Override
@@ -113,7 +116,8 @@ public void build() throws Exception {
113116
whereSql,
114117
includedTables,
115118
excludedTables,
116-
preferFileFormat);
119+
preferFileFormat,
120+
metaOnly);
117121
break;
118122
case "paimon":
119123
ClonePaimonTableUtils.build(
@@ -129,7 +133,8 @@ public void build() throws Exception {
129133
whereSql,
130134
includedTables,
131135
excludedTables,
132-
preferFileFormat);
136+
preferFileFormat,
137+
metaOnly);
133138
break;
134139
}
135140
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class CloneActionFactory implements ActionFactory {
4040
private static final String EXCLUDED_TABLES = "excluded_tables";
4141
private static final String PREFER_FILE_FORMAT = "prefer_file_format";
4242
private static final String CLONE_FROM = "clone_from";
43+
private static final String META_ONLY = "meta_only";
4344

4445
@Override
4546
public String identifier() {
@@ -76,6 +77,12 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
7677
cloneFrom = "hive";
7778
}
7879
String preferFileFormat = params.get(PREFER_FILE_FORMAT);
80+
81+
String metaOnlyStr = params.get(META_ONLY);
82+
boolean metaOnly =
83+
!StringUtils.isNullOrWhitespaceOnly(metaOnlyStr)
84+
&& Boolean.parseBoolean(metaOnlyStr);
85+
7986
CloneAction cloneAction =
8087
new CloneAction(
8188
params.get(DATABASE),
@@ -89,7 +96,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
8996
includedTables,
9097
excludedTables,
9198
preferFileFormat,
92-
cloneFrom);
99+
cloneFrom,
100+
metaOnly);
93101

94102
return Optional.of(cloneAction);
95103
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ public static void build(
156156
@Nullable String whereSql,
157157
@Nullable List<String> includedTables,
158158
@Nullable List<String> excludedTables,
159-
@Nullable String preferFileFormat)
159+
@Nullable String preferFileFormat,
160+
boolean metaOnly)
160161
throws Exception {
161162
// list source tables
162163
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -183,6 +184,12 @@ public static void build(
183184
.name("Clone Schema")
184185
.setParallelism(parallelism);
185186

187+
// if metaOnly is true, only clone schema and skip data cloning
188+
if (metaOnly) {
189+
schemaInfos.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
190+
return;
191+
}
192+
186193
buildForCloneSplits(
187194
sourceCatalogConfig, targetCatalogConfig, parallelism, whereSql, schemaInfos);
188195

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ public static void build(
138138
@Nullable String whereSql,
139139
@Nullable List<String> includedTables,
140140
@Nullable List<String> excludedTables,
141-
@Nullable String preferFileFormat)
141+
@Nullable String preferFileFormat,
142+
boolean metaOnly)
142143
throws Exception {
143144
// list source tables
144145
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -165,6 +166,12 @@ public static void build(
165166
.name("Clone Schema")
166167
.setParallelism(parallelism);
167168

169+
// if metaOnly is true, only clone schema and skip data cloning
170+
if (metaOnly) {
171+
schemaInfos.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
172+
return;
173+
}
174+
168175
// list splits
169176
DataStream<CloneSplitInfo> splits =
170177
schemaInfos

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public class CloneProcedure extends ProcedureBase {
7373
@ArgumentHint(
7474
name = "clone_from",
7575
type = @DataTypeHint("STRING"),
76+
isOptional = true),
77+
@ArgumentHint(
78+
name = "meta_only",
79+
type = @DataTypeHint("BOOLEAN"),
7680
isOptional = true)
7781
})
7882
public String[] call(
@@ -88,7 +92,8 @@ public String[] call(
8892
String includedTablesStr,
8993
String excludedTablesStr,
9094
String preferFileFormat,
91-
String cloneFrom)
95+
String cloneFrom,
96+
Boolean metaOnly)
9297
throws Exception {
9398
Map<String, String> sourceCatalogConfig =
9499
new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -118,7 +123,8 @@ public String[] call(
118123
includedTables,
119124
excludedTables,
120125
preferFileFormat,
121-
cloneFrom);
126+
cloneFrom,
127+
metaOnly != null && metaOnly);
122128
return execute(procedureContext, action, "Clone Job");
123129
}
124130

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,64 @@ public void testMigrateWithPreferFileFormat() throws Exception {
10121012
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
10131013
}
10141014

1015+
@Test
1016+
public void testMigrateWithMetaOnly() throws Exception {
1017+
String format = "avro";
1018+
String dbName = "hivedb" + StringUtils.randomNumericString(10);
1019+
String tableName = "hivetable" + StringUtils.randomNumericString(10);
1020+
1021+
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
1022+
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
1023+
tEnv.useCatalog("HIVE");
1024+
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
1025+
tEnv.executeSql("CREATE DATABASE " + dbName);
1026+
sql(
1027+
tEnv,
1028+
"CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 INT)"
1029+
+ "STORED AS %s ",
1030+
dbName,
1031+
tableName,
1032+
format);
1033+
sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
1034+
1035+
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
1036+
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')");
1037+
tEnv.useCatalog("PAIMON_GE");
1038+
1039+
tEnv.executeSql(
1040+
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" + warehouse + "')");
1041+
tEnv.useCatalog("PAIMON");
1042+
tEnv.executeSql("CREATE DATABASE test");
1043+
1044+
List<String> args =
1045+
new ArrayList<>(
1046+
Arrays.asList(
1047+
"clone",
1048+
"--database",
1049+
dbName,
1050+
"--table",
1051+
tableName,
1052+
"--catalog_conf",
1053+
"metastore=hive",
1054+
"--catalog_conf",
1055+
"uri=thrift://localhost:" + PORT,
1056+
"--target_database",
1057+
"test",
1058+
"--target_table",
1059+
"test_table",
1060+
"--target_catalog_conf",
1061+
"warehouse=" + warehouse,
1062+
"--meta_only",
1063+
"true"));
1064+
1065+
createAction(CloneAction.class, args).run();
1066+
FileStoreTable paimonTable =
1067+
paimonTable(tEnv, "PAIMON", Identifier.create("test", "test_table"));
1068+
// table exists but no data
1069+
assertThat(paimonTable.schema().fieldNames()).containsExactly("id", "id2", "id3");
1070+
assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNull();
1071+
}
1072+
10151073
private String[] ddls(String format) {
10161074
// has primary key
10171075
String ddl0 =

0 commit comments

Comments
 (0)