diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java index 5d83155b4ea8..81801d6b033c 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java @@ -51,7 +51,6 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.SortDirection; import org.apache.iceberg.SortOrder; import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.Table; @@ -251,21 +250,16 @@ private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, S return; } - try { - SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class); - if (sortFields != null && !sortFields.getSortFields().isEmpty()) { - SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema); - sortFields.getSortFields().forEach(fieldDesc -> { - NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ? - NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; - SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ? - SortDirection.ASC : SortDirection.DESC; - sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder); - }); - properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build())); - } - } catch (Exception e) { - LOG.warn("Can not read write order json: {}", sortOrderJSONString); + List sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString); + if (sortFieldDescList != null) { + SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema); + sortFieldDescList.forEach(fieldDesc -> + sortOrderBuilder.sortBy( + fieldDesc.getColumnName(), + convertSortDirection(fieldDesc.getDirection()), + convertNullOrder(fieldDesc.getNullOrdering())) + ); + properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build())); } } @@ -304,6 +298,44 @@ private boolean isZOrderJSON(String jsonString) { } } + /** + * Parses Hive SortFields JSON and returns the list of sort field descriptors. + * This is a common utility method used by both CREATE TABLE and ALTER TABLE flows. + * + * @param sortOrderJSONString The JSON string containing Hive SortFields + * @return List of SortFieldDesc, or null if parsing fails or JSON is empty + */ + protected List parseSortFieldsJSON(String sortOrderJSONString) { + if (Strings.isNullOrEmpty(sortOrderJSONString)) { + return null; + } + + try { + SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class); + if (sortFields != null && !sortFields.getSortFields().isEmpty()) { + return sortFields.getSortFields(); + } + } catch (Exception e) { + LOG.warn("Failed to parse sort order JSON: {}", sortOrderJSONString, e); + } + return null; + } + + /** + * Converts Hive NullOrdering to Iceberg NullOrder. + */ + protected static NullOrder convertNullOrder(NullOrdering nullOrdering) { + return nullOrdering == NullOrdering.NULLS_FIRST ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + } + + /** + * Converts Hive SortDirection to Iceberg SortDirection. + */ + private static org.apache.iceberg.SortDirection convertSortDirection(SortFieldDesc.SortDirection direction) { + return direction == SortFieldDesc.SortDirection.ASC ? + org.apache.iceberg.SortDirection.ASC : org.apache.iceberg.SortDirection.DESC; + } + @Override public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { // do nothing diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 7f77d7dde9e0..235ff950b113 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -54,6 +55,7 @@ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -85,13 +87,17 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.PartitionsTable; +import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; @@ -624,15 +630,65 @@ private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table hms Map hmsTableParameters = hmsTable.getParameters(); Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR); UpdateProperties icebergUpdateProperties = icebergTable.updateProperties(); + if (contextProperties.containsKey(SET_PROPERTIES)) { - splitter.splitToList(contextProperties.get(SET_PROPERTIES)) - .forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))); + List propertiesToSet = splitter.splitToList(contextProperties.get(SET_PROPERTIES)); + + // Define handlers for properties that need special processing + Map> propertyHandlers = Maps.newHashMap(); + propertyHandlers.put(TableProperties.DEFAULT_SORT_ORDER, + key -> handleDefaultSortOrder(hmsTable, hmsTableParameters)); + + // Process each property using handlers or default behavior + propertiesToSet.forEach(key -> + propertyHandlers.getOrDefault(key, + k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k)) + ).accept(key) + ); } else if (contextProperties.containsKey(UNSET_PROPERTIES)) { splitter.splitToList(contextProperties.get(UNSET_PROPERTIES)).forEach(icebergUpdateProperties::remove); } + icebergUpdateProperties.commit(); } + /** + * Handles conversion of Hive SortFields JSON to Iceberg SortOrder. + * Uses Iceberg's replaceSortOrder() API to properly handle the reserved property. + */ + private void handleDefaultSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, + Map hmsTableParameters) { + String sortOrderJSONString = hmsTableParameters.get(TableProperties.DEFAULT_SORT_ORDER); + + List sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString); + if (sortFieldDescList != null) { + try { + ReplaceSortOrder replaceSortOrder = icebergTable.replaceSortOrder(); + + // Chain all the sort field additions + for (SortFieldDesc fieldDesc : sortFieldDescList) { + NullOrder nullOrder = convertNullOrder(fieldDesc.getNullOrdering()); + + if (fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC) { + replaceSortOrder.asc(fieldDesc.getColumnName(), nullOrder); + } else { + replaceSortOrder.desc(fieldDesc.getColumnName(), nullOrder); + } + } + + replaceSortOrder.commit(); + + // Update HMS table parameters with the Iceberg SortOrder JSON + SortOrder newSortOrder = icebergTable.sortOrder(); + hmsTableParameters.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(newSortOrder)); + + LOG.info("Successfully set sort order for table {}: {}", hmsTable.getTableName(), newSortOrder); + } catch (Exception e) { + LOG.warn("Failed to apply sort order for table {}: {}", hmsTable.getTableName(), sortOrderJSONString, e); + } + } + } + private void setupAlterOperationType(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) throws MetaException { TableName tableName = new TableName(hmsTable.getCatName(), hmsTable.getDbName(), hmsTable.getTableName()); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_alter_locally_ordered_table.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_alter_locally_ordered_table.q new file mode 100644 index 000000000000..01835013b281 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_alter_locally_ordered_table.q @@ -0,0 +1,34 @@ +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ +-- Mask the totalSize value as it can have slight variability, causing test flakiness +--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ +-- Mask added file size +--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask iceberg version +--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.vectorized.execution.enabled=true; + +-- Test ALTER TABLE SET WRITE [LOCALLY] ORDERED BY + +create table ice_orc_order (id int, name string, age int, city string) stored by iceberg stored as orc; +describe formatted ice_orc_order; + +alter table ice_orc_order set write ordered by id desc nulls first, name asc nulls last; +describe formatted ice_orc_order; + +explain insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver'); +insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver'); +select * from ice_orc_order; + +drop table ice_orc_order; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out new file mode 100644 index 000000000000..e42a27153cba --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out @@ -0,0 +1,266 @@ +PREHOOK: query: create table ice_orc_order (id int, name string, age int, city string) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc_order +POSTHOOK: query: create table ice_orc_order (id int, name string, age int, city string) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc_order +PREHOOK: query: describe formatted ice_orc_order +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc_order +POSTHOOK: query: describe formatted ice_orc_order +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc_order +# col_name data_type comment +id int +name string +age int +city string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"age\":\"true\",\"city\":\"true\",\"id\":\"true\",\"name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"age\",\"required\":false,\"type\":\"int\"},{\"id\":4,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 0 + numRows 0 + parquet.compression zstd + rawDataSize 0 + serialization.format 1 + snapshot-count 0 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.metadata.delete-after-commit.enabled true + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [] +PREHOOK: query: alter table ice_orc_order set write ordered by id desc nulls first, name asc nulls last +PREHOOK: type: ALTERTABLE_SET_WRITE_ORDER +PREHOOK: Input: default@ice_orc_order +PREHOOK: Output: default@ice_orc_order +POSTHOOK: query: alter table ice_orc_order set write ordered by id desc nulls first, name asc nulls last +POSTHOOK: type: ALTERTABLE_SET_WRITE_ORDER +POSTHOOK: Input: default@ice_orc_order +POSTHOOK: Output: default@ice_orc_order +PREHOOK: query: describe formatted ice_orc_order +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_orc_order +POSTHOOK: query: describe formatted ice_orc_order +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_orc_order +# col_name data_type comment +id int +name string +age int +city string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"age\":\"true\",\"city\":\"true\",\"id\":\"true\",\"name\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"age\",\"required\":false,\"type\":\"int\"},{\"id\":4,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]} + default-sort-order {\"order-id\":1,\"fields\":[{\"transform\":\"identity\",\"source-id\":1,\"direction\":\"desc\",\"null-order\":\"nulls-first\"},{\"transform\":\"identity\",\"source-id\":2,\"direction\":\"asc\",\"null-order\":\"nulls-last\"}]} + format-version 2 + iceberg.orc.files.only true +#### A masked pattern was here #### + numFiles 0 + numRows 0 + parquet.compression zstd +#### A masked pattern was here #### + rawDataSize 0 + serialization.format 1 + snapshot-count 0 + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default orc + write.merge.mode merge-on-read + write.metadata.delete-after-commit.enabled true + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Sort Columns: [FieldSchema(name:id, type:int, comment:Transform: identity, Sort direction: DESC, Null sort order: NULLS_FIRST), FieldSchema(name:name, type:string, comment:Transform: identity, Sort direction: ASC, Null sort order: NULLS_LAST)] +PREHOOK: query: explain insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc_order +POSTHOOK: query: explain insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc_order +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: _dummy_table + Row Limit Per Split: 1 + Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: array(const struct(4,'David',28,'Seattle'),const struct(5,'Eve',32,'Boston'),const struct(6,'Frank',29,'Austin'),const struct(7,'Grace',32,'Denver')) (type: array>) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 72 Basic stats: COMPLETE Column stats: COMPLETE + UDTF Operator + Statistics: Num rows: 1 Data size: 72 Basic stats: COMPLETE Column stats: COMPLETE + function name: inline + Select Operator + expressions: col1 (type: int), col2 (type: string), col3 (type: int), col4 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + null sort order: az + sort order: -+ + Map-reduce partition columns: _col0 (type: int), _col1 (type: string) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int), _col3 (type: string) + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int), _col3 (type: string) + outputColumnNames: id, name, age, city + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(id), max(id), count(1), count(id), compute_bit_vector_hll(id), max(length(name)), avg(COALESCE(length(name),0)), count(name), compute_bit_vector_hll(name), min(age), max(age), count(age), compute_bit_vector_hll(age), max(length(city)), avg(COALESCE(length(city),0)), count(city), compute_bit_vector_hll(city) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16 + Statistics: Num rows: 1 Data size: 792 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 792 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary), _col13 (type: int), _col14 (type: struct), _col15 (type: bigint), _col16 (type: binary) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string), VALUE._col2 (type: int), VALUE._col3 (type: string), KEY._col0 (type: int), KEY._col1 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col0, _col1 + File Output Operator + compressed: false + Dp Sort State: PARTITION_SORTED + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.ice_orc_order + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12), max(VALUE._col13), avg(VALUE._col14), count(VALUE._col15), compute_bit_vector_hll(VALUE._col16) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16 + Statistics: Num rows: 1 Data size: 656 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col13,0)) (type: bigint), COALESCE(_col14,0) (type: double), (_col2 - _col15) (type: bigint), COALESCE(ndv_compute_bit_vector(_col16),0) (type: bigint), _col16 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23 + Statistics: Num rows: 1 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: false + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.ice_orc_order + + Stage: Stage-3 + Stats Work + Basic Stats Work: + Column Stats Desc: + Columns: id, name, age, city + Column Types: int, string, int, string + Table: default.ice_orc_order + +PREHOOK: query: insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc_order +POSTHOOK: query: insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc_order +PREHOOK: query: select * from ice_orc_order +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc_order +#### A masked pattern was here #### +POSTHOOK: query: select * from ice_orc_order +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc_order +#### A masked pattern was here #### +7 Grace 32 Denver +6 Frank 29 Austin +5 Eve 32 Boston +4 David 28 Seattle +PREHOOK: query: drop table ice_orc_order +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice_orc_order +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_orc_order +POSTHOOK: query: drop table ice_orc_order +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice_orc_order +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_orc_order diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index a1ea7acea44f..92e80e6a822d 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -418,6 +418,7 @@ erasurecoding.only.query.files=\ iceberg.llap.query.files=\ hadoop_catalog_create_table.q,\ + iceberg_alter_locally_ordered_table.q,\ iceberg_alter_locally_zordered_table.q,\ iceberg_bucket_map_join_1.q,\ iceberg_bucket_map_join_2.q,\ @@ -467,6 +468,7 @@ iceberg.llap.query.rest.gravitino.files=\ iceberg.llap.only.query.files=\ hadoop_catalog_create_table.q,\ + iceberg_alter_locally_ordered_table.q,\ iceberg_alter_locally_zordered_table.q,\ iceberg_bucket_map_join_1.q,\ iceberg_bucket_map_join_2.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortOrderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortOrderUtils.java new file mode 100644 index 000000000000..3063f7b46e74 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/sortoder/SortOrderUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.ddl.misc.sortoder; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc.SortDirection; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; +import org.apache.hadoop.hive.ql.util.NullOrdering; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.unescapeIdentifier; + +/** + * Utility class for parsing and serializing sort order specifications. + * Used by both CREATE TABLE and ALTER TABLE commands to avoid code duplication. + */ +public final class SortOrderUtils { + private static final Logger LOG = LoggerFactory.getLogger(SortOrderUtils.class); + private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper(); + + private SortOrderUtils() { + throw new UnsupportedOperationException("SortOrderUtils should not be instantiated!"); + } + + /** + * Parses an AST node containing sort column specifications and converts to JSON. + * The AST node should contain children of type TOK_TABSORTCOLNAMEASC or TOK_TABSORTCOLNAMEDESC. + * + * @param ast AST node with sort columns + * @return JSON string representation of SortFields, or null if parsing fails or AST is empty + */ + public static String parseSortOrderToJson(ASTNode ast) { + if (ast == null || ast.getChildCount() == 0) { + return null; + } + + List sortFieldDescList = new ArrayList<>(); + + for (int i = 0; i < ast.getChildCount(); i++) { + ASTNode child = (ASTNode) ast.getChild(i); + + // Determine sort direction from token type + SortDirection sortDirection = + child.getToken().getType() == HiveParser.TOK_TABSORTCOLNAMEDESC + ? SortDirection.DESC + : SortDirection.ASC; + + // Get column spec node + ASTNode colSpecNode = (ASTNode) child.getChild(0); + String columnName = unescapeIdentifier(colSpecNode.getChild(0).getText()).toLowerCase(); + NullOrdering nullOrder = NullOrdering.fromToken(colSpecNode.getToken().getType()); + + sortFieldDescList.add(new SortFieldDesc(columnName, sortDirection, nullOrder)); + } + + if (sortFieldDescList.isEmpty()) { + return null; + } + + try { + SortFields sortFields = new SortFields(sortFieldDescList); + return JSON_OBJECT_MAPPER.writeValueAsString(sortFields); + } catch (JsonProcessingException e) { + LOG.warn("Failed to serialize sort order specification", e); + return null; + } + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java index 74273f780cf4..49e5b5020f07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableAnalyzer.java @@ -53,8 +53,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.ddl.DDLSemanticAnalyzerFactory.DDLType; import org.apache.hadoop.hive.ql.ddl.DDLWork; -import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; -import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortOrderUtils; import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFieldDesc; import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFields; import org.apache.hadoop.hive.ql.ddl.table.constraint.ConstraintsUtils; @@ -79,7 +78,6 @@ import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; -import org.apache.hadoop.hive.ql.util.NullOrdering; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Strings; @@ -171,24 +169,7 @@ private boolean hasConstraints(final List partCols, final List sortFieldDescList = new ArrayList<>(); - SortFields sortFields = new SortFields(sortFieldDescList); - for (int i = 0; i < ast.getChildCount(); i++) { - ASTNode child = (ASTNode) ast.getChild(i); - SortFieldDesc.SortDirection sortDirection = - child.getToken().getType() == HiveParser.TOK_TABSORTCOLNAMEDESC ? SortFieldDesc.SortDirection.DESC - : SortFieldDesc.SortDirection.ASC; - child = (ASTNode) child.getChild(0); - String name = unescapeIdentifier(child.getChild(0).getText()).toLowerCase(); - NullOrdering nullOrder = NullOrdering.fromToken(child.getToken().getType()); - sortFieldDescList.add(new SortFieldDesc(name, sortDirection, nullOrder)); - } - try { - return JSON_OBJECT_MAPPER.writer().writeValueAsString(sortFields); - } catch (JsonProcessingException e) { - LOG.warn("Can not create write order json. ", e); - return null; - } + return SortOrderUtils.parseSortOrderToJson(ast); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderAnalyzer.java index 1aade613e77d..56443fbeaf73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderAnalyzer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.ddl.DDLUtils; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.DDLSemanticAnalyzerFactory.DDLType; +import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortOrderUtils; import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.metadata.Table; @@ -36,7 +37,7 @@ /** * Analyzer for ALTER TABLE ... SET WRITE [LOCALLY] ORDERED BY commands. - * Currently supports Z-ORDER only. Regular ORDERED BY support will be added in a future commit. + * Supports both Z-ORDER and Natural Order for Iceberg tables. */ @DDLType(types = HiveParser.TOK_ALTERTABLE_SET_WRITE_ORDER) public class AlterTableSetWriteOrderAnalyzer extends AbstractAlterTableAnalyzer { @@ -56,32 +57,70 @@ protected void analyzeCommand(TableName tableName, Map partition ASTNode orderNode = (ASTNode) command.getChild(0); if (orderNode.getType() == HiveParser.TOK_WRITE_LOCALLY_ORDERED_BY_ZORDER) { // Handle Z-ORDER - ASTNode columnListNode = (ASTNode) orderNode.getChild(0); - List columnNames = new ArrayList<>(); - for (int i = 0; i < columnListNode.getChildCount(); i++) { - ASTNode child = (ASTNode) columnListNode.getChild(i); - columnNames.add(unescapeIdentifier(child.getText()).toLowerCase()); - } - - if (columnNames.isEmpty()) { - throw new SemanticException("Z-order requires at least one column"); - } - - // Set Z-order properties in table props sort.order=ZORDER and sort.columns=col1,col2,... - Map props = Map.of( - "sort.order", "ZORDER", - "sort.columns", String.join(",", columnNames) - ); - - AlterTableSetWriteOrderDesc desc = new AlterTableSetWriteOrderDesc(tableName, partitionSpec, props); - addInputsOutputsAlterTable(tableName, partitionSpec, desc, desc.getType(), false); - - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); + handleZOrder(tableName, partitionSpec, orderNode); } else if (orderNode.getType() == HiveParser.TOK_WRITE_LOCALLY_ORDERED) { - // Regular ORDERED BY - to be implemented in future commit - throw new SemanticException("Regular ORDERED BY is not yet supported. Only ZORDER is supported."); + // Handle natural ORDERED BY + handleNaturalOrder(tableName, partitionSpec, orderNode); } else { throw new SemanticException("Unexpected token type: " + orderNode.getType()); } } + + /** + * Handles Z-ORDER syntax: ALTER TABLE ... SET WRITE ORDERED BY ZORDER(col1, col2, ...) + */ + private void handleZOrder(TableName tableName, Map partitionSpec, ASTNode orderNode) + throws SemanticException { + ASTNode columnListNode = (ASTNode) orderNode.getChild(0); + List columnNames = new ArrayList<>(); + for (int i = 0; i < columnListNode.getChildCount(); i++) { + ASTNode child = (ASTNode) columnListNode.getChild(i); + columnNames.add(unescapeIdentifier(child.getText()).toLowerCase()); + } + + if (columnNames.isEmpty()) { + throw new SemanticException("Z-order requires at least one column"); + } + + // Set Z-order properties: sort.order=ZORDER and sort.columns=col1,col2,... + Map props = Map.of( + "sort.order", "ZORDER", + "sort.columns", String.join(",", columnNames) + ); + + createAndAddTask(tableName, partitionSpec, props); + } + + /** + * Handles regular ORDERED BY syntax: ALTER TABLE ... SET WRITE ORDERED BY (col1 ASC, col2 DESC NULLS LAST, ...) + * Creates a Hive-native SortFields JSON that will be converted to Iceberg format by the metahook. + */ + private void handleNaturalOrder(TableName tableName, Map partitionSpec, ASTNode orderNode) + throws SemanticException { + ASTNode sortColumnListNode = (ASTNode) orderNode.getChild(0); + + // Parse and serialize to JSON using the utility + String sortOrderJson = SortOrderUtils.parseSortOrderToJson(sortColumnListNode); + if (sortOrderJson == null) { + throw new SemanticException("Failed to serialize sort order specification"); + } + + // Set the sort order JSON in table properties + // The metahook will detect this and convert to Iceberg format + Map props = Map.of( + "default-sort-order", sortOrderJson + ); + + createAndAddTask(tableName, partitionSpec, props); + } + + /** + * Creates the DDL descriptor, sets up inputs/outputs, and adds the task to rootTasks. + */ + private void createAndAddTask(TableName tableName, Map partitionSpec, + Map props) throws SemanticException { + AlterTableSetWriteOrderDesc desc = new AlterTableSetWriteOrderDesc(tableName, partitionSpec, props); + addInputsOutputsAlterTable(tableName, partitionSpec, desc, desc.getType(), false); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderDesc.java index 2b46feacf84f..2182801c3f7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/order/AlterTableSetWriteOrderDesc.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -33,10 +34,18 @@ @Explain(displayName = "Set Write Order", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class AlterTableSetWriteOrderDesc extends AbstractAlterTableDesc { private static final long serialVersionUID = 1L; + + private final EnvironmentContext environmentContext; public AlterTableSetWriteOrderDesc(TableName tableName, Map partitionSpec, Map props) throws SemanticException { super(AlterTableType.ADDPROPS, tableName, partitionSpec, null, false, false, props); + this.environmentContext = new EnvironmentContext(); + } + + @Override + public EnvironmentContext getEnvironmentContext() { + return environmentContext; } @Override