|
19 | 19 |
|
20 | 20 | package org.apache.iceberg.mr.hive; |
21 | 21 |
|
| 22 | +import com.fasterxml.jackson.databind.ObjectMapper; |
22 | 23 | import java.io.IOException; |
23 | 24 | import java.net.URLDecoder; |
24 | 25 | import java.nio.charset.StandardCharsets; |
|
54 | 55 | import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; |
55 | 56 | import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; |
56 | 57 | import org.apache.hadoop.hive.ql.QueryState; |
| 58 | +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; |
57 | 59 | import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; |
58 | 60 | import org.apache.hadoop.hive.ql.exec.SerializationUtilities; |
59 | 61 | import org.apache.hadoop.hive.ql.io.AcidUtils; |
|
85 | 87 | import org.apache.iceberg.FileScanTask; |
86 | 88 | import org.apache.iceberg.MetadataTableType; |
87 | 89 | import org.apache.iceberg.MetadataTableUtils; |
| 90 | +import org.apache.iceberg.NullOrder; |
88 | 91 | import org.apache.iceberg.PartitionData; |
89 | 92 | import org.apache.iceberg.PartitionField; |
90 | 93 | import org.apache.iceberg.PartitionSpec; |
91 | 94 | import org.apache.iceberg.PartitionSpecParser; |
92 | 95 | import org.apache.iceberg.PartitionsTable; |
| 96 | +import org.apache.iceberg.ReplaceSortOrder; |
93 | 97 | import org.apache.iceberg.Schema; |
94 | 98 | import org.apache.iceberg.SchemaParser; |
| 99 | +import org.apache.iceberg.SortOrder; |
| 100 | +import org.apache.iceberg.SortOrderParser; |
95 | 101 | import org.apache.iceberg.Table; |
96 | 102 | import org.apache.iceberg.TableMetadata; |
97 | 103 | import org.apache.iceberg.TableMetadataParser; |
|
141 | 147 |
|
142 | 148 | public class HiveIcebergMetaHook extends BaseHiveIcebergMetaHook { |
143 | 149 | private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); |
| 150 | + private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper(); |
144 | 151 | static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of( |
145 | 152 | AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN, AlterTableType.DROP_COLUMN, |
146 | 153 | AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC, |
@@ -624,12 +631,55 @@ private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table hms |
624 | 631 | Map<String, String> hmsTableParameters = hmsTable.getParameters(); |
625 | 632 | Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR); |
626 | 633 | UpdateProperties icebergUpdateProperties = icebergTable.updateProperties(); |
| 634 | + |
627 | 635 | if (contextProperties.containsKey(SET_PROPERTIES)) { |
628 | | - splitter.splitToList(contextProperties.get(SET_PROPERTIES)) |
629 | | - .forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); |
| 636 | + List<String> propertiesToSet = splitter.splitToList(contextProperties.get(SET_PROPERTIES)); |
| 637 | + |
| 638 | + // Check if we are setting regular sort order as it needs conversion from Hive JSON to Iceberg SortOrder |
| 639 | + if (propertiesToSet.contains(TableProperties.DEFAULT_SORT_ORDER)) { |
| 640 | + // If the HMS table has Hive SortFields JSON in default-sort-order |
| 641 | + // We need to convert it to Iceberg SortOrder and use replaceSortOrder() API |
| 642 | + String sortOrderJSONString = hmsTableParameters.get(TableProperties.DEFAULT_SORT_ORDER); |
| 643 | + |
| 644 | + List<SortFieldDesc> sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString); |
| 645 | + if (sortFieldDescList != null) { |
| 646 | + try { |
| 647 | + ReplaceSortOrder replaceSortOrder = icebergTable.replaceSortOrder(); |
| 648 | + |
| 649 | + // Chain all the sort field additions |
| 650 | + for (SortFieldDesc fieldDesc : sortFieldDescList) { |
| 651 | + NullOrder nullOrder = convertNullOrder(fieldDesc.getNullOrdering()); |
| 652 | + |
| 653 | + if (fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC) { |
| 654 | + replaceSortOrder.asc(fieldDesc.getColumnName(), nullOrder); |
| 655 | + } else { |
| 656 | + replaceSortOrder.desc(fieldDesc.getColumnName(), nullOrder); |
| 657 | + } |
| 658 | + } |
| 659 | + |
| 660 | + replaceSortOrder.commit(); |
| 661 | + |
| 662 | + // Update HMS table parameters with the Iceberg SortOrder JSON |
| 663 | + SortOrder newSortOrder = icebergTable.sortOrder(); |
| 664 | + hmsTableParameters.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(newSortOrder)); |
| 665 | + |
| 666 | + LOG.info("Successfully set sort order for table {}: {}", hmsTable.getTableName(), newSortOrder); |
| 667 | + } catch (Exception e) { |
| 668 | + LOG.warn("Failed to apply sort order for table {}: {}", hmsTable.getTableName(), sortOrderJSONString, e); |
| 669 | + } |
| 670 | + } |
| 671 | + |
| 672 | + // Set other properties excluding default-sort-order which is already processed) |
| 673 | + propertiesToSet.stream() |
| 674 | + .filter(k -> !k.equals(TableProperties.DEFAULT_SORT_ORDER)) |
| 675 | + .forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); |
| 676 | + } else { |
| 677 | + propertiesToSet.forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); |
| 678 | + } |
630 | 679 | } else if (contextProperties.containsKey(UNSET_PROPERTIES)) { |
631 | 680 | splitter.splitToList(contextProperties.get(UNSET_PROPERTIES)).forEach(icebergUpdateProperties::remove); |
632 | 681 | } |
| 682 | + |
633 | 683 | icebergUpdateProperties.commit(); |
634 | 684 | } |
635 | 685 |
|
|
0 commit comments