Skip to content

Commit f35e6f3

Browse files
committed
HIVE-29345: Support Alter table command for write ordering.
SYNTAX: ALTER TABLE table_name SET WRITE ORDERED BY column_name sort_direction NULLS FIRST/LAST, ... EXAMPLE: ALTER TABLE table_order SET WRITE ORDERED BY id desc nulls first, name asc nulls last;
1 parent 5446484 commit f35e6f3

File tree

9 files changed

+573
-63
lines changed

9 files changed

+573
-63
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.iceberg.PartitionSpecParser;
5252
import org.apache.iceberg.Schema;
5353
import org.apache.iceberg.SchemaParser;
54-
import org.apache.iceberg.SortDirection;
5554
import org.apache.iceberg.SortOrder;
5655
import org.apache.iceberg.SortOrderParser;
5756
import org.apache.iceberg.Table;
@@ -251,21 +250,16 @@ private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, S
251250
return;
252251
}
253252

254-
try {
255-
SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class);
256-
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
257-
SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema);
258-
sortFields.getSortFields().forEach(fieldDesc -> {
259-
NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
260-
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
261-
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
262-
SortDirection.ASC : SortDirection.DESC;
263-
sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
264-
});
265-
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build()));
266-
}
267-
} catch (Exception e) {
268-
LOG.warn("Can not read write order json: {}", sortOrderJSONString);
253+
List<SortFieldDesc> sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString);
254+
if (sortFieldDescList != null) {
255+
SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema);
256+
sortFieldDescList.forEach(fieldDesc ->
257+
sortOrderBuilder.sortBy(
258+
fieldDesc.getColumnName(),
259+
convertSortDirection(fieldDesc.getDirection()),
260+
convertNullOrder(fieldDesc.getNullOrdering()))
261+
);
262+
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build()));
269263
}
270264
}
271265

@@ -304,6 +298,44 @@ private boolean isZOrderJSON(String jsonString) {
304298
}
305299
}
306300

301+
/**
302+
* Parses Hive SortFields JSON and returns the list of sort field descriptors.
303+
* This is a common utility method used by both CREATE TABLE and ALTER TABLE flows.
304+
*
305+
* @param sortOrderJSONString The JSON string containing Hive SortFields
306+
* @return List of SortFieldDesc, or null if parsing fails or JSON is empty
307+
*/
308+
protected List<SortFieldDesc> parseSortFieldsJSON(String sortOrderJSONString) {
309+
if (Strings.isNullOrEmpty(sortOrderJSONString)) {
310+
return null;
311+
}
312+
313+
try {
314+
SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class);
315+
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
316+
return sortFields.getSortFields();
317+
}
318+
} catch (Exception e) {
319+
LOG.warn("Failed to parse sort order JSON: {}", sortOrderJSONString, e);
320+
}
321+
return null;
322+
}
323+
324+
/**
325+
* Converts Hive NullOrdering to Iceberg NullOrder.
326+
*/
327+
protected static NullOrder convertNullOrder(NullOrdering nullOrdering) {
328+
return nullOrdering == NullOrdering.NULLS_FIRST ? NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
329+
}
330+
331+
/**
332+
* Converts Hive SortDirection to Iceberg SortDirection.
333+
*/
334+
private static org.apache.iceberg.SortDirection convertSortDirection(SortFieldDesc.SortDirection direction) {
335+
return direction == SortFieldDesc.SortDirection.ASC ?
336+
org.apache.iceberg.SortDirection.ASC : org.apache.iceberg.SortDirection.DESC;
337+
}
338+
307339
@Override
308340
public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
309341
// do nothing

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Objects;
3131
import java.util.Optional;
32+
import java.util.function.Consumer;
3233
import java.util.function.Function;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.Stream;
@@ -54,6 +55,7 @@
5455
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
5556
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
5657
import org.apache.hadoop.hive.ql.QueryState;
58+
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
5759
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
5860
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
5961
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -85,13 +87,17 @@
8587
import org.apache.iceberg.FileScanTask;
8688
import org.apache.iceberg.MetadataTableType;
8789
import org.apache.iceberg.MetadataTableUtils;
90+
import org.apache.iceberg.NullOrder;
8891
import org.apache.iceberg.PartitionData;
8992
import org.apache.iceberg.PartitionField;
9093
import org.apache.iceberg.PartitionSpec;
9194
import org.apache.iceberg.PartitionSpecParser;
9295
import org.apache.iceberg.PartitionsTable;
96+
import org.apache.iceberg.ReplaceSortOrder;
9397
import org.apache.iceberg.Schema;
9498
import org.apache.iceberg.SchemaParser;
99+
import org.apache.iceberg.SortOrder;
100+
import org.apache.iceberg.SortOrderParser;
95101
import org.apache.iceberg.Table;
96102
import org.apache.iceberg.TableMetadata;
97103
import org.apache.iceberg.TableMetadataParser;
@@ -624,15 +630,65 @@ private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table hms
624630
Map<String, String> hmsTableParameters = hmsTable.getParameters();
625631
Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR);
626632
UpdateProperties icebergUpdateProperties = icebergTable.updateProperties();
633+
627634
if (contextProperties.containsKey(SET_PROPERTIES)) {
628-
splitter.splitToList(contextProperties.get(SET_PROPERTIES))
629-
.forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k)));
635+
List<String> propertiesToSet = splitter.splitToList(contextProperties.get(SET_PROPERTIES));
636+
637+
// Define handlers for properties that need special processing
638+
Map<String, Consumer<String>> propertyHandlers = Maps.newHashMap();
639+
propertyHandlers.put(TableProperties.DEFAULT_SORT_ORDER,
640+
key -> handleDefaultSortOrder(hmsTable, hmsTableParameters));
641+
642+
// Process each property using handlers or default behavior
643+
propertiesToSet.forEach(key ->
644+
propertyHandlers.getOrDefault(key,
645+
k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k))
646+
).accept(key)
647+
);
630648
} else if (contextProperties.containsKey(UNSET_PROPERTIES)) {
631649
splitter.splitToList(contextProperties.get(UNSET_PROPERTIES)).forEach(icebergUpdateProperties::remove);
632650
}
651+
633652
icebergUpdateProperties.commit();
634653
}
635654

655+
/**
656+
* Handles conversion of Hive SortFields JSON to Iceberg SortOrder.
657+
* Uses Iceberg's replaceSortOrder() API to properly handle the reserved property.
658+
*/
659+
private void handleDefaultSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable,
660+
Map<String, String> hmsTableParameters) {
661+
String sortOrderJSONString = hmsTableParameters.get(TableProperties.DEFAULT_SORT_ORDER);
662+
663+
List<SortFieldDesc> sortFieldDescList = parseSortFieldsJSON(sortOrderJSONString);
664+
if (sortFieldDescList != null) {
665+
try {
666+
ReplaceSortOrder replaceSortOrder = icebergTable.replaceSortOrder();
667+
668+
// Chain all the sort field additions
669+
for (SortFieldDesc fieldDesc : sortFieldDescList) {
670+
NullOrder nullOrder = convertNullOrder(fieldDesc.getNullOrdering());
671+
672+
if (fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC) {
673+
replaceSortOrder.asc(fieldDesc.getColumnName(), nullOrder);
674+
} else {
675+
replaceSortOrder.desc(fieldDesc.getColumnName(), nullOrder);
676+
}
677+
}
678+
679+
replaceSortOrder.commit();
680+
681+
// Update HMS table parameters with the Iceberg SortOrder JSON
682+
SortOrder newSortOrder = icebergTable.sortOrder();
683+
hmsTableParameters.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(newSortOrder));
684+
685+
LOG.info("Successfully set sort order for table {}: {}", hmsTable.getTableName(), newSortOrder);
686+
} catch (Exception e) {
687+
LOG.warn("Failed to apply sort order for table {}: {}", hmsTable.getTableName(), sortOrderJSONString, e);
688+
}
689+
}
690+
}
691+
636692
private void setupAlterOperationType(org.apache.hadoop.hive.metastore.api.Table hmsTable,
637693
EnvironmentContext context) throws MetaException {
638694
TableName tableName = new TableName(hmsTable.getCatName(), hmsTable.getDbName(), hmsTable.getTableName());
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
-- Mask neededVirtualColumns due to non-strict order
2+
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
3+
-- Mask the totalSize value as it can have slight variability, causing test flakiness
4+
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
5+
-- Mask random uuid
6+
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
7+
-- Mask a random snapshot id
8+
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
9+
-- Mask added file size
10+
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
11+
-- Mask total file size
12+
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
13+
-- Mask removed file size
14+
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
15+
-- Mask current-snapshot-timestamp-ms
16+
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
17+
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
18+
-- Mask iceberg version
19+
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
20+
set hive.vectorized.execution.enabled=true;
21+
22+
-- Test ALTER TABLE SET WRITE [LOCALLY] ORDERED BY
23+
24+
create table ice_orc_order (id int, name string, age int, city string) stored by iceberg stored as orc;
25+
describe formatted ice_orc_order;
26+
27+
alter table ice_orc_order set write ordered by id desc nulls first, name asc nulls last;
28+
describe formatted ice_orc_order;
29+
30+
explain insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver');
31+
insert into ice_orc_order values (4, 'David', 28, 'Seattle'),(5, 'Eve', 32, 'Boston'),(6, 'Frank', 29, 'Austin'),(7, 'Grace', 32, 'Denver');
32+
select * from ice_orc_order;
33+
34+
drop table ice_orc_order;

0 commit comments

Comments
 (0)