Skip to content

Commit d61a151

Browse files
committed
Fix data partitioning based on time/timestamp* columns with millisecond precision
1 parent 6a4703c commit d61a151

File tree

3 files changed

+39
-13
lines changed

3 files changed

+39
-13
lines changed

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ private static List<DataFile> processFile(
422422
logger.warn(
423423
"{} does not appear to be partitioned. Falling back to full scan (slow)",
424424
inputFile.location());
425+
} else {
426+
logger.info("{}: using inferred partition key {}", file, partitionKey);
425427
}
426428
}
427429

ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.apache.parquet.hadoop.metadata.BlockMetaData;
4343
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
4444
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
45+
import org.apache.parquet.schema.LogicalTypeAnnotation;
46+
import org.apache.parquet.schema.PrimitiveType;
4547

4648
public final class Partitioning {
4749

@@ -157,8 +159,15 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
157159
Transform<Object, Object> transform = (Transform<Object, Object>) field.transform();
158160
SerializableFunction<Object, Object> boundTransform = transform.bind(type);
159161

160-
Object minTransformed = boundTransform.apply(stats.genericGetMin());
161-
Object maxTransformed = boundTransform.apply(stats.genericGetMax());
162+
PrimitiveType parquetType = stats.type();
163+
164+
Comparable<?> parquetMin = stats.genericGetMin();
165+
var min = fromParquetPrimitive(type, parquetType, parquetMin);
166+
Comparable<?> parquetMax = stats.genericGetMax();
167+
var max = fromParquetPrimitive(type, parquetType, parquetMax);
168+
169+
Object minTransformed = boundTransform.apply(min);
170+
Object maxTransformed = boundTransform.apply(max);
162171

163172
if (!minTransformed.equals(maxTransformed)) {
164173
same = false;
@@ -167,7 +176,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
167176

168177
if (valueTransformed == null) {
169178
valueTransformed = minTransformed;
170-
value = stats.genericGetMin();
179+
value = min;
171180
} else if (!valueTransformed.equals(minTransformed)) {
172181
same = false;
173182
break;
@@ -186,23 +195,40 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
186195
return partitionKey;
187196
}
188197

198+
// Copied from org.apache.iceberg.parquet.ParquetConversions.
199+
private static Object fromParquetPrimitive(Type type, PrimitiveType parquetType, Object value) {
200+
switch (type.typeId()) {
201+
case TIME:
202+
case TIMESTAMP:
203+
// time & timestamp/timestamptz are stored in microseconds
204+
// https://iceberg.apache.org/spec/#parquet
205+
var millis =
206+
(parquetType.getLogicalTypeAnnotation()
207+
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation t
208+
&& t.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS);
209+
var v = ((Number) value).longValue();
210+
return millis ? v * 1000L : v;
211+
}
212+
return value;
213+
}
214+
189215
private static Object decodeStatValue(Object parquetStatValue, Type icebergType) {
190216
if (parquetStatValue == null) return null;
191217
return switch (icebergType.typeId()) {
192-
case STRING -> ((org.apache.parquet.io.api.Binary) parquetStatValue).toStringUsingUTF8();
218+
case BOOLEAN -> parquetStatValue;
193219
case INTEGER -> ((Number) parquetStatValue).intValue();
194220
case LONG -> ((Number) parquetStatValue).longValue();
195221
case FLOAT -> ((Number) parquetStatValue).floatValue();
196222
case DOUBLE -> ((Number) parquetStatValue).doubleValue();
197-
case BOOLEAN -> parquetStatValue;
198223
case DATE ->
199224
// Parquet DATE (INT32) is days since epoch (same as Iceberg DATE)
200225
((Number) parquetStatValue).intValue();
201-
case TIMESTAMP ->
226+
case TIME, TIMESTAMP ->
202227
// Parquet timestamp might come as INT64 (micros) or Binary; assuming long micros for now
203228
((Number) parquetStatValue).longValue();
204-
case DECIMAL -> throw new UnsupportedOperationException();
205-
default -> null;
229+
case STRING -> ((org.apache.parquet.io.api.Binary) parquetStatValue).toStringUsingUTF8();
230+
default ->
231+
throw new UnsupportedOperationException("unsupported type: " + icebergType.typeId());
206232
};
207233
}
208234

ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ static <T> Literal<T> fromParquetPrimitive(Type type, PrimitiveType parquetType,
4444
case DATE:
4545
return (Literal<T>) Literal.of((Integer) value);
4646
case LONG:
47+
return (Literal<T>) Literal.of((Long) value);
4748
case TIME:
4849
case TIMESTAMP:
4950
// time & timestamp/timestamptz are stored in microseconds
@@ -52,11 +53,8 @@ static <T> Literal<T> fromParquetPrimitive(Type type, PrimitiveType parquetType,
5253
(parquetType.getLogicalTypeAnnotation()
5354
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation t
5455
&& t.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS);
55-
if (parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) { // uint32
56-
long v = ((Integer) value).longValue();
57-
return (Literal<T>) Literal.of(millis ? v * 1000L : v);
58-
}
59-
return (Literal<T>) Literal.of(millis ? (Long) value * 1000L : (Long) value);
56+
var v = ((Number) value).longValue();
57+
return (Literal<T>) Literal.of(millis ? v * 1000L : v);
6058
case FLOAT:
6159
return (Literal<T>) Literal.of((Float) value);
6260
case DOUBLE:

0 commit comments

Comments
 (0)