Skip to content

Commit a3f4ab3

Browse files
authored
[core] Reduce get partition requests in migrating hive table (#5364)
1 parent c8a6464 commit a3f4ab3

File tree

2 files changed

+11
-21
lines changed

2 files changed

+11
-21
lines changed

paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ private static DataFileMeta constructFileMeta(
240240

241241
public static BinaryRow writePartitionValue(
242242
RowType partitionRowType,
243-
Map<String, String> partitionValues,
243+
List<String> partitionValues,
244244
List<BinaryWriter.ValueSetter> valueSetters,
245245
String partitionDefaultName) {
246246

@@ -250,7 +250,7 @@ public static BinaryRow writePartitionValue(
250250
List<DataField> fields = partitionRowType.getFields();
251251

252252
for (int i = 0; i < fields.size(); i++) {
253-
String partitionName = partitionValues.get(fields.get(i).name());
253+
String partitionName = partitionValues.get(i);
254254
if (partitionName.equals(partitionDefaultName)) {
255255
binaryRowWriter.setNullAt(i);
256256
} else {

paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,20 @@ public void executeMigrate() throws Exception {
163163
FileStoreTable paimonTable = (FileStoreTable) hiveCatalog.getTable(identifier);
164164
checkPaimonTable(paimonTable);
165165

166-
List<String> partitionsNames =
167-
client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
166+
List<Partition> partitions =
167+
client.listPartitions(sourceDatabase, sourceTable, Short.MAX_VALUE);
168168
checkCompatible(sourceHiveTable, paimonTable);
169169

170170
List<MigrateTask> tasks = new ArrayList<>();
171171
Map<Path, Path> rollBack = new ConcurrentHashMap<>();
172-
if (partitionsNames.isEmpty()) {
172+
if (partitions.isEmpty()) {
173173
tasks.add(
174174
importUnPartitionedTableTask(
175175
fileIO, sourceHiveTable, paimonTable, rollBack));
176176
} else {
177177
tasks.addAll(
178178
importPartitionedTableTask(
179-
client,
180-
fileIO,
181-
partitionsNames,
182-
sourceHiveTable,
183-
paimonTable,
184-
rollBack));
179+
fileIO, partitions, sourceHiveTable, paimonTable, rollBack));
185180
}
186181

187182
List<Future<CommitMessage>> futures =
@@ -292,13 +287,11 @@ public Schema from(
292287
}
293288

294289
private List<MigrateTask> importPartitionedTableTask(
295-
IMetaStoreClient client,
296290
FileIO fileIO,
297-
List<String> partitionNames,
291+
List<Partition> partitions,
298292
Table sourceTable,
299293
FileStoreTable paimonTable,
300-
Map<Path, Path> rollback)
301-
throws Exception {
294+
Map<Path, Path> rollback) {
302295
List<MigrateTask> migrateTasks = new ArrayList<>();
303296
List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
304297

@@ -309,17 +302,14 @@ private List<MigrateTask> importPartitionedTableTask(
309302
.getFieldTypes()
310303
.forEach(type -> valueSetters.add(BinaryWriter.createValueSetter(type)));
311304

312-
for (String partitionName : partitionNames) {
313-
Partition partition =
314-
client.getPartition(
315-
sourceTable.getDbName(), sourceTable.getTableName(), partitionName);
316-
Map<String, String> values = client.partitionNameToSpec(partitionName);
305+
for (Partition partition : partitions) {
306+
List<String> partitionValues = partition.getValues();
317307
String format = parseFormat(partition.getSd().getSerdeInfo().toString());
318308
String location = partition.getSd().getLocation();
319309
BinaryRow partitionRow =
320310
FileMetaUtils.writePartitionValue(
321311
partitionRowType,
322-
values,
312+
partitionValues,
323313
valueSetters,
324314
coreOptions.partitionDefaultName());
325315
Path path = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);

0 commit comments

Comments
 (0)