-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29345: Support Alter table command for write ordering. #6256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,7 +51,6 @@ | |
| import org.apache.iceberg.PartitionSpecParser; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SchemaParser; | ||
| import org.apache.iceberg.SortDirection; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.SortOrderParser; | ||
| import org.apache.iceberg.Table; | ||
|
|
@@ -238,7 +237,7 @@ private void validateCatalogConfigsDefined() { | |
| * - Otherwise, the JSON is a list of SortFields; we convert it to Iceberg | ||
| * SortOrder JSON and keep it in DEFAULT_SORT_ORDER for Iceberg to use it. | ||
| */ | ||
| private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema, | ||
| protected void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema, | ||
| Properties properties) { | ||
| String sortOrderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER); | ||
| if (Strings.isNullOrEmpty(sortOrderJSONString)) { | ||
|
|
@@ -251,21 +250,16 @@ private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, S | |
| return; | ||
| } | ||
|
|
||
| try { | ||
| SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class); | ||
| if (sortFields != null && !sortFields.getSortFields().isEmpty()) { | ||
| SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema); | ||
| sortFields.getSortFields().forEach(fieldDesc -> { | ||
| NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ? | ||
| NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; | ||
| SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ? | ||
| SortDirection.ASC : SortDirection.DESC; | ||
| sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder); | ||
| }); | ||
| properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build())); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("Can not read write order json: {}", sortOrderJSONString); | ||
| List<SortFieldDesc> sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString); | ||
| if (sortFieldDescList != null) { | ||
| SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema); | ||
| sortFieldDescList.forEach(fieldDesc -> { | ||
| sortOrderBuilder.sortBy( | ||
| fieldDesc.getColumnName(), | ||
| convertSortDirection(fieldDesc.getDirection()), | ||
| convertNullOrder(fieldDesc.getNullOrdering())); | ||
| }); | ||
| properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build())); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -304,6 +298,44 @@ private boolean isZOrderJSON(String jsonString) { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parses Hive SortFields JSON and returns the list of sort field descriptors. | ||
| * This is a common utility method used by both CREATE TABLE and ALTER TABLE flows. | ||
| * | ||
| * @param sortOrderJSONString The JSON string containing Hive SortFields | ||
| * @return List of SortFieldDesc, or null if parsing fails or JSON is empty | ||
| */ | ||
| protected List<SortFieldDesc> parseSortFieldsJSON(String sortOrderJSONString) { | ||
| if (Strings.isNullOrEmpty(sortOrderJSONString) || isZOrderJSON(sortOrderJSONString)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we check |
||
| return null; | ||
| } | ||
|
|
||
| try { | ||
| SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class); | ||
| if (sortFields != null && !sortFields.getSortFields().isEmpty()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need |
||
| return sortFields.getSortFields(); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to parse sort order JSON: {}", sortOrderJSONString, e); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Converts Hive NullOrdering to Iceberg NullOrder. | ||
| */ | ||
| protected static NullOrder convertNullOrder(NullOrdering nullOrdering) { | ||
| return nullOrdering == NullOrdering.NULLS_FIRST ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; | ||
| } | ||
|
|
||
| /** | ||
| * Converts Hive SortDirection to Iceberg SortDirection. | ||
| */ | ||
| protected static org.apache.iceberg.SortDirection convertSortDirection(SortFieldDesc.SortDirection direction) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. protected? |
||
| return direction == SortFieldDesc.SortDirection.ASC ? | ||
| org.apache.iceberg.SortDirection.ASC : org.apache.iceberg.SortDirection.DESC; | ||
| } | ||
|
|
||
| @Override | ||
| public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { | ||
| // do nothing | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| package org.apache.iceberg.mr.hive; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import java.io.IOException; | ||
| import java.net.URLDecoder; | ||
| import java.nio.charset.StandardCharsets; | ||
|
|
@@ -54,6 +55,7 @@ | |
| import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; | ||
| import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; | ||
| import org.apache.hadoop.hive.ql.QueryState; | ||
| import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; | ||
| import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; | ||
| import org.apache.hadoop.hive.ql.exec.SerializationUtilities; | ||
| import org.apache.hadoop.hive.ql.io.AcidUtils; | ||
|
|
@@ -85,13 +87,17 @@ | |
| import org.apache.iceberg.FileScanTask; | ||
| import org.apache.iceberg.MetadataTableType; | ||
| import org.apache.iceberg.MetadataTableUtils; | ||
| import org.apache.iceberg.NullOrder; | ||
| import org.apache.iceberg.PartitionData; | ||
| import org.apache.iceberg.PartitionField; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.PartitionSpecParser; | ||
| import org.apache.iceberg.PartitionsTable; | ||
| import org.apache.iceberg.ReplaceSortOrder; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SchemaParser; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.SortOrderParser; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableMetadata; | ||
| import org.apache.iceberg.TableMetadataParser; | ||
|
|
@@ -141,6 +147,7 @@ | |
|
|
||
| public class HiveIcebergMetaHook extends BaseHiveIcebergMetaHook { | ||
| private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); | ||
| private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this? |
||
| static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of( | ||
| AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN, AlterTableType.DROP_COLUMN, | ||
| AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC, | ||
|
|
@@ -624,12 +631,55 @@ private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table hms | |
| Map<String, String> hmsTableParameters = hmsTable.getParameters(); | ||
| Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR); | ||
| UpdateProperties icebergUpdateProperties = icebergTable.updateProperties(); | ||
|
|
||
| if (contextProperties.containsKey(SET_PROPERTIES)) { | ||
| splitter.splitToList(contextProperties.get(SET_PROPERTIES)) | ||
| .forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); | ||
| List<String> propertiesToSet = splitter.splitToList(contextProperties.get(SET_PROPERTIES)); | ||
|
|
||
| // Check if we are setting regular sort order as it needs conversion from Hive JSON to Iceberg SortOrder | ||
| if (propertiesToSet.contains(TableProperties.DEFAULT_SORT_ORDER)) { | ||
| // If the HMS table has Hive SortFields JSON in default-sort-order | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe extract this logic into a separate method. also consider |
||
| // We need to convert it to Iceberg SortOrder and use replaceSortOrder() API | ||
| String sortOrderJSONString = hmsTableParameters.get(TableProperties.DEFAULT_SORT_ORDER); | ||
|
|
||
| List<SortFieldDesc> sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString); | ||
| if (sortFieldDescList != null) { | ||
| try { | ||
| ReplaceSortOrder replaceSortOrder = icebergTable.replaceSortOrder(); | ||
|
|
||
| // Chain all the sort field additions | ||
| for (SortFieldDesc fieldDesc : sortFieldDescList) { | ||
| NullOrder nullOrder = convertNullOrder(fieldDesc.getNullOrdering()); | ||
|
|
||
| if (fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC) { | ||
| replaceSortOrder.asc(fieldDesc.getColumnName(), nullOrder); | ||
| } else { | ||
| replaceSortOrder.desc(fieldDesc.getColumnName(), nullOrder); | ||
| } | ||
| } | ||
|
|
||
| replaceSortOrder.commit(); | ||
|
|
||
| // Update HMS table parameters with the Iceberg SortOrder JSON | ||
| SortOrder newSortOrder = icebergTable.sortOrder(); | ||
| hmsTableParameters.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(newSortOrder)); | ||
|
|
||
| LOG.info("Successfully set sort order for table {}: {}", hmsTable.getTableName(), newSortOrder); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to apply sort order for table {}: {}", hmsTable.getTableName(), sortOrderJSONString, e); | ||
| } | ||
| } | ||
|
|
||
| // Set other properties excluding default-sort-order which is already processed) | ||
| propertiesToSet.stream() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why add filter here and not handle |
||
| .filter(k -> !k.equals(TableProperties.DEFAULT_SORT_ORDER)) | ||
| .forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); | ||
| } else { | ||
| propertiesToSet.forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); | ||
| } | ||
| } else if (contextProperties.containsKey(UNSET_PROPERTIES)) { | ||
| splitter.splitToList(contextProperties.get(UNSET_PROPERTIES)).forEach(icebergUpdateProperties::remove); | ||
| } | ||
|
|
||
| icebergUpdateProperties.commit(); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| -- Mask neededVirtualColumns due to non-strict order | ||
| --! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ | ||
| -- Mask the totalSize value as it can have slight variability, causing test flakiness | ||
| --! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/ | ||
| -- Mask random uuid | ||
| --! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ | ||
| -- Mask a random snapshot id | ||
| --! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ | ||
| -- Mask added file size | ||
| --! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ | ||
| -- Mask total file size | ||
| --! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ | ||
| -- Mask removed file size | ||
| --! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ | ||
| -- Mask current-snapshot-timestamp-ms | ||
| --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ | ||
| --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ | ||
| -- Mask iceberg version | ||
| --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ | ||
| set hive.vectorized.execution.enabled=true; | ||
|
|
||
| -- Test ALTER TABLE SET WRITE [LOCALLY] ORDERED BY | ||
|
|
||
| create table ice_orc_order (id int, name string, age int, city string) stored by iceberg stored as orc; | ||
| describe formatted ice_orc_order; | ||
|
|
||
| alter table ice_orc_order set write ordered by id desc nulls first, name asc nulls last; | ||
| describe formatted ice_orc_order; | ||
|
|
||
| explain insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver'); | ||
| insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver'); | ||
| select * from ice_orc_order; | ||
|
|
||
| drop table ice_orc_order; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why protected?