Skip to content

Commit 983a552

Browse files
authored
[hive] HiveMigrator support set parallelism for procedures (#4177)
1 parent 647865f commit 983a552

File tree

23 files changed

+259
-35
lines changed

23 files changed

+259
-35
lines changed

docs/content/spark/procedures.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ This section introduce all available spark procedures about paimon.
139139
<li>options: the table options of the paimon table to migrate.</li>
140140
<li>target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table</li>
141141
<li>delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true</li>
142-
<li>options_map: Options map for adding key-value options which is a map.</li>
142+
<li>options_map: Options map for adding key-value options which is a map.</li>
143+
<li>parallelism: the parallelism for migrate process, default is core numbers of machine.</li>
143144
</td>
144-
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'))</td>
145+
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)</td>
145146
</tr>
146147
<tr>
147148
<td>migrate_file</td>
@@ -151,8 +152,9 @@ This section introduce all available spark procedures about paimon.
151152
<li>source_table: name of the origin table to migrate. Cannot be empty.</li>
152153
<li>target_table: name of the target table to be migrated. Cannot be empty.</li>
153154
<li>delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true</li>
155+
<li>parallelism: the parallelism for migrate process, default is core numbers of machine.</li>
154156
</td>
155-
<td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true)</td>
157+
<td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true, parallelism => 6)</td>
156158
</tr>
157159
<tr>
158160
<td>remove_orphan_files</td>

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,31 @@ public String[] call(
5151
connector,
5252
catalog,
5353
sourceDatabasePath,
54+
Runtime.getRuntime().availableProcessors(),
55+
ParameterUtils.parseCommaSeparatedKeyValues(properties));
56+
57+
for (Migrator migrator : migrators) {
58+
migrator.executeMigrate();
59+
migrator.renameTable(false);
60+
}
61+
62+
return new String[] {"Success"};
63+
}
64+
65+
public String[] call(
66+
ProcedureContext procedureContext,
67+
String connector,
68+
String sourceDatabasePath,
69+
String properties,
70+
Integer parallelism)
71+
throws Exception {
72+
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
73+
List<Migrator> migrators =
74+
TableMigrationUtils.getImporters(
75+
connector,
76+
catalog,
77+
sourceDatabasePath,
78+
p,
5479
ParameterUtils.parseCommaSeparatedKeyValues(properties));
5580

5681
for (Migrator migrator : migrators) {

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ public String[] call(
4040
String sourceTablePath,
4141
String targetPaimonTablePath)
4242
throws Exception {
43-
call(procedureContext, connector, sourceTablePath, targetPaimonTablePath, true);
43+
migrateHandle(
44+
connector,
45+
sourceTablePath,
46+
targetPaimonTablePath,
47+
true,
48+
Runtime.getRuntime().availableProcessors());
4449
return new String[] {"Success"};
4550
}
4651

@@ -51,15 +56,34 @@ public String[] call(
5156
String targetPaimonTablePath,
5257
boolean deleteOrigin)
5358
throws Exception {
54-
migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin);
59+
migrateHandle(
60+
connector,
61+
sourceTablePath,
62+
targetPaimonTablePath,
63+
deleteOrigin,
64+
Runtime.getRuntime().availableProcessors());
65+
return new String[] {"Success"};
66+
}
67+
68+
public String[] call(
69+
ProcedureContext procedureContext,
70+
String connector,
71+
String sourceTablePath,
72+
String targetPaimonTablePath,
73+
boolean deleteOrigin,
74+
Integer parallelism)
75+
throws Exception {
76+
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
77+
migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin, p);
5578
return new String[] {"Success"};
5679
}
5780

5881
public void migrateHandle(
5982
String connector,
6083
String sourceTablePath,
6184
String targetPaimonTablePath,
62-
boolean deleteOrigin)
85+
boolean deleteOrigin,
86+
Integer parallelism)
6387
throws Exception {
6488
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
6589
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
@@ -77,6 +101,7 @@ public void migrateHandle(
77101
sourceTableId.getObjectName(),
78102
targetTableId.getDatabaseName(),
79103
targetTableId.getObjectName(),
104+
parallelism,
80105
Collections.emptyMap());
81106
importer.deleteOriginTable(deleteOrigin);
82107
importer.executeMigrate();

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,35 @@ public String[] call(
6262
sourceTableId.getObjectName(),
6363
targetTableId.getDatabaseName(),
6464
targetTableId.getObjectName(),
65+
Runtime.getRuntime().availableProcessors(),
66+
ParameterUtils.parseCommaSeparatedKeyValues(properties))
67+
.executeMigrate();
68+
69+
LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
70+
catalog.renameTable(targetTableId, sourceTableId, false);
71+
return new String[] {"Success"};
72+
}
73+
74+
public String[] call(
75+
ProcedureContext procedureContext,
76+
String connector,
77+
String sourceTablePath,
78+
String properties,
79+
Integer parallelism)
80+
throws Exception {
81+
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
82+
83+
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
84+
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
85+
86+
TableMigrationUtils.getImporter(
87+
connector,
88+
catalog,
89+
sourceTableId.getDatabaseName(),
90+
sourceTableId.getObjectName(),
91+
targetTableId.getDatabaseName(),
92+
targetTableId.getObjectName(),
93+
parallelism,
6594
ParameterUtils.parseCommaSeparatedKeyValues(properties))
6695
.executeMigrate();
6796

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,31 @@ public class MigrateDatabaseAction extends ActionBase {
2929
private final String connector;
3030
private final String hiveDatabaseName;
3131
private final String tableProperties;
32+
private final Integer parallelism;
3233

3334
public MigrateDatabaseAction(
3435
String connector,
3536
String warehouse,
3637
String hiveDatabaseName,
3738
Map<String, String> catalogConfig,
38-
String tableProperties) {
39+
String tableProperties,
40+
Integer parallelism) {
3941
super(warehouse, catalogConfig);
4042
this.connector = connector;
4143
this.hiveDatabaseName = hiveDatabaseName;
4244
this.tableProperties = tableProperties;
45+
this.parallelism = parallelism;
4346
}
4447

4548
@Override
4649
public void run() throws Exception {
4750
MigrateDatabaseProcedure migrateDatabaseProcedure = new MigrateDatabaseProcedure();
4851
migrateDatabaseProcedure.withCatalog(catalog);
4952
migrateDatabaseProcedure.call(
50-
new DefaultProcedureContext(env), connector, hiveDatabaseName, tableProperties);
53+
new DefaultProcedureContext(env),
54+
connector,
55+
hiveDatabaseName,
56+
tableProperties,
57+
parallelism);
5158
}
5259
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class MigrateDatabaseActionFactory implements ActionFactory {
2828

2929
private static final String SOURCE_TYPE = "source_type";
3030
private static final String OPTIONS = "options";
31+
private static final String PARALLELISM = "parallelism";
3132

3233
@Override
3334
public String identifier() {
@@ -41,10 +42,16 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
4142
String sourceHiveDatabase = params.get(DATABASE);
4243
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
4344
String tableConf = params.get(OPTIONS);
45+
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
4446

4547
MigrateDatabaseAction migrateDatabaseAction =
4648
new MigrateDatabaseAction(
47-
connector, warehouse, sourceHiveDatabase, catalogConfig, tableConf);
49+
connector,
50+
warehouse,
51+
sourceHiveDatabase,
52+
catalogConfig,
53+
tableConf,
54+
parallelism);
4855
return Optional.of(migrateDatabaseAction);
4956
}
5057

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class MigrateFileAction extends ActionBase {
3232
private final String targetTable;
3333
private final String tableProperties;
3434
private boolean deleteOrigin;
35+
private Integer parallelism;
3536

3637
public MigrateFileAction(
3738
String connector,
@@ -40,13 +41,15 @@ public MigrateFileAction(
4041
String targetTable,
4142
boolean deleteOrigin,
4243
Map<String, String> catalogConfig,
43-
String tableProperties) {
44+
String tableProperties,
45+
Integer parallelism) {
4446
super(warehouse, catalogConfig);
4547
this.connector = connector;
4648
this.sourceTable = sourceTable;
4749
this.targetTable = targetTable;
4850
this.deleteOrigin = deleteOrigin;
4951
this.tableProperties = tableProperties;
52+
this.parallelism = parallelism;
5053
}
5154

5255
@Override
@@ -58,6 +61,7 @@ public void run() throws Exception {
5861
connector,
5962
sourceTable,
6063
targetTable,
61-
deleteOrigin);
64+
deleteOrigin,
65+
parallelism);
6266
}
6367
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class MigrateFileActionFactory implements ActionFactory {
3535
private static final String DELETE_ORIGIN = "delete_origin";
3636

3737
private static final String OPTIONS = "options";
38+
private static final String PARALLELISM = "parallelism";
3839

3940
@Override
4041
public String identifier() {
@@ -50,6 +51,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
5051
boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
5152
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
5253
String tableConf = params.get(OPTIONS);
54+
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
5355

5456
MigrateFileAction migrateFileAction =
5557
new MigrateFileAction(
@@ -59,7 +61,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
5961
targetTable,
6062
deleteOrigin,
6163
catalogConfig,
62-
tableConf);
64+
tableConf,
65+
parallelism);
6366
return Optional.of(migrateFileAction);
6467
}
6568

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,31 @@ public class MigrateTableAction extends ActionBase {
3030
private final String connector;
3131
private final String hiveTableFullName;
3232
private final String tableProperties;
33+
private final Integer parallelism;
3334

3435
public MigrateTableAction(
3536
String connector,
3637
String warehouse,
3738
String hiveTableFullName,
3839
Map<String, String> catalogConfig,
39-
String tableProperties) {
40+
String tableProperties,
41+
Integer parallelism) {
4042
super(warehouse, catalogConfig);
4143
this.connector = connector;
4244
this.hiveTableFullName = hiveTableFullName;
4345
this.tableProperties = tableProperties;
46+
this.parallelism = parallelism;
4447
}
4548

4649
@Override
4750
public void run() throws Exception {
4851
MigrateTableProcedure migrateTableProcedure = new MigrateTableProcedure();
4952
migrateTableProcedure.withCatalog(catalog);
5053
migrateTableProcedure.call(
51-
new DefaultProcedureContext(env), connector, hiveTableFullName, tableProperties);
54+
new DefaultProcedureContext(env),
55+
connector,
56+
hiveTableFullName,
57+
tableProperties,
58+
parallelism);
5259
}
5360
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class MigrateTableActionFactory implements ActionFactory {
2828

2929
private static final String SOURCE_TYPE = "source_type";
3030
private static final String OPTIONS = "options";
31+
private static final String PARALLELISM = "parallelism";
3132

3233
@Override
3334
public String identifier() {
@@ -41,10 +42,16 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
4142
String sourceHiveTable = params.get(TABLE);
4243
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
4344
String tableConf = params.get(OPTIONS);
45+
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
4446

4547
MigrateTableAction migrateTableAction =
4648
new MigrateTableAction(
47-
connector, warehouse, sourceHiveTable, catalogConfig, tableConf);
49+
connector,
50+
warehouse,
51+
sourceHiveTable,
52+
catalogConfig,
53+
tableConf,
54+
parallelism);
4855
return Optional.of(migrateTableAction);
4956
}
5057

0 commit comments

Comments
 (0)