Skip to content

Commit 3d58586

Browse files
authored
HIVE-29372: Iceberg: [V3] Fix inconsistencies when vectorizartion is enabled in case of variant shredding. (#6245)
1 parent 90bf407 commit 3d58586

File tree

5 files changed

+84
-10
lines changed

5 files changed

+84
-10
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@
187187
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
188188
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
189189
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
190+
import org.apache.iceberg.parquet.VariantUtil;
190191
import org.apache.iceberg.puffin.Blob;
191192
import org.apache.iceberg.puffin.BlobMetadata;
192193
import org.apache.iceberg.puffin.Puffin;
@@ -1751,7 +1752,8 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps)
17511752
if (FileFormat.AVRO == IcebergTableUtil.defaultFileFormat(tableProps::getProperty) ||
17521753
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) ||
17531754
hasOrcTimeInSchema(tableProps, tableSchema) ||
1754-
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
1755+
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema) ||
1756+
VariantUtil.shouldUseVariantShredding(tableProps::getProperty, tableSchema)) {
17551757
// disable vectorization
17561758
SessionStateUtil.getQueryState(conf).ifPresent(queryState ->
17571759
queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
8686
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
8787
builder.createWriterFunc(GenericParquetWriter::create);
8888
// Configure variant shredding if enabled and a sample record is available
89-
if (VariantUtil.shouldUseVariantShredding(properties, dataSchema())) {
89+
if (VariantUtil.shouldUseVariantShredding(properties::get, dataSchema())) {
9090
setVariantShreddingFunc(builder, VariantUtil.variantShreddingFunc(sampleRecord, dataSchema()));
9191
}
9292
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.function.UnaryOperator;
2425
import org.apache.iceberg.Accessor;
2526
import org.apache.iceberg.Schema;
2627
import org.apache.iceberg.StructLike;
@@ -59,8 +60,8 @@ public record VariantField(int fieldId, Accessor<StructLike> accessor, String[]
5960
/**
6061
* Check if variant shredding is enabled via table properties.
6162
*/
62-
public static boolean isVariantShreddingEnabled(Map<String, String> properties) {
63-
String shreddingEnabled = properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
63+
public static boolean isVariantShreddingEnabled(UnaryOperator<String> propertyLookup) {
64+
String shreddingEnabled = propertyLookup.apply(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
6465
return Boolean.parseBoolean(shreddingEnabled);
6566
}
6667

@@ -73,7 +74,7 @@ public static boolean isShreddable(Object value) {
7374

7475
public static List<VariantField> variantFieldsForShredding(
7576
Map<String, String> properties, Schema schema) {
76-
if (!isVariantShreddingEnabled(properties)) {
77+
if (!isVariantShreddingEnabled(properties::get)) {
7778
return List.of();
7879
}
7980
return variantFieldsForShredding(schema);
@@ -89,8 +90,8 @@ private static List<VariantField> variantFieldsForShredding(Schema schema) {
8990
return results;
9091
}
9192

92-
public static boolean shouldUseVariantShredding(Map<String, String> properties, Schema schema) {
93-
return isVariantShreddingEnabled(properties) && hasVariantFields(schema);
93+
public static boolean shouldUseVariantShredding(UnaryOperator<String> propertyLookup, Schema schema) {
94+
return isVariantShreddingEnabled(propertyLookup) && hasVariantFields(schema);
9495
}
9596

9697
private static boolean hasVariantFields(Schema schema) {

iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,30 @@ INSERT INTO tbl_shredded_variant VALUES
2727
(2, parse_json('{"name": "Bill", "active": false}')),
2828
(3, parse_json('{"name": "Henry", "age": 20}'));
2929

30-
-- Disable vectorized execution until Variant type is supported
31-
set hive.vectorized.execution.enabled=false;
32-
3330
-- Retrieve and verify
3431
SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
3532
WHERE variant_get(data, '$.age') > 25;
3633

3734
EXPLAIN
3835
SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
3936
WHERE variant_get(data, '$.age') > 25;
37+
38+
CREATE TABLE t (
39+
id INT,
40+
v VARIANT
41+
)
42+
STORED BY ICEBERG
43+
TBLPROPERTIES (
44+
'format-version'='3',
45+
'variant.shredding.enabled'='true'
46+
);
47+
48+
INSERT INTO t VALUES
49+
(1, parse_json('{"a": 1}')),
50+
(2, parse_json('{"b": 2}'));
51+
52+
SELECT
53+
try_variant_get(v, '$.a'),
54+
try_variant_get(v, '$.b')
55+
FROM t
56+
ORDER BY id;

iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,57 @@ STAGE PLANS:
9999
Processor Tree:
100100
ListSink
101101

102+
PREHOOK: query: CREATE TABLE t (
103+
id INT,
104+
v VARIANT
105+
)
106+
STORED BY ICEBERG
107+
TBLPROPERTIES (
108+
'format-version'='3',
109+
'variant.shredding.enabled'='true'
110+
)
111+
PREHOOK: type: CREATETABLE
112+
PREHOOK: Output: database:default
113+
PREHOOK: Output: default@t
114+
POSTHOOK: query: CREATE TABLE t (
115+
id INT,
116+
v VARIANT
117+
)
118+
STORED BY ICEBERG
119+
TBLPROPERTIES (
120+
'format-version'='3',
121+
'variant.shredding.enabled'='true'
122+
)
123+
POSTHOOK: type: CREATETABLE
124+
POSTHOOK: Output: database:default
125+
POSTHOOK: Output: default@t
126+
PREHOOK: query: INSERT INTO t VALUES
127+
(1, parse_json('{"a": 1}')),
128+
(2, parse_json('{"b": 2}'))
129+
PREHOOK: type: QUERY
130+
PREHOOK: Input: _dummy_database@_dummy_table
131+
PREHOOK: Output: default@t
132+
POSTHOOK: query: INSERT INTO t VALUES
133+
(1, parse_json('{"a": 1}')),
134+
(2, parse_json('{"b": 2}'))
135+
POSTHOOK: type: QUERY
136+
POSTHOOK: Input: _dummy_database@_dummy_table
137+
POSTHOOK: Output: default@t
138+
PREHOOK: query: SELECT
139+
try_variant_get(v, '$.a'),
140+
try_variant_get(v, '$.b')
141+
FROM t
142+
ORDER BY id
143+
PREHOOK: type: QUERY
144+
PREHOOK: Input: default@t
145+
PREHOOK: Output: hdfs://### HDFS PATH ###
146+
POSTHOOK: query: SELECT
147+
try_variant_get(v, '$.a'),
148+
try_variant_get(v, '$.b')
149+
FROM t
150+
ORDER BY id
151+
POSTHOOK: type: QUERY
152+
POSTHOOK: Input: default@t
153+
POSTHOOK: Output: hdfs://### HDFS PATH ###
154+
1 NULL
155+
NULL 2

0 commit comments

Comments
 (0)