Skip to content

Commit 43a5d8a

Browse files
authored
[lake/iceberg] Fix local date and time class cast (apache#2620)
1 parent 2df2d6b commit 43a5d8a

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929

3030
import java.math.BigDecimal;
3131
import java.nio.ByteBuffer;
32+
import java.time.LocalDate;
3233
import java.time.LocalDateTime;
34+
import java.time.LocalTime;
3335
import java.time.OffsetDateTime;
3436
import java.util.List;
3537
import java.util.Map;
@@ -72,7 +74,14 @@ public short getShort(int pos) {
7274

7375
@Override
7476
public int getInt(int pos) {
75-
return (Integer) icebergList.get(pos);
77+
Object value = icebergList.get(pos);
78+
if (value instanceof LocalDate) {
79+
return (int) ((LocalDate) value).toEpochDay();
80+
}
81+
if (value instanceof LocalTime) {
82+
return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000;
83+
}
84+
return (Integer) value;
7685
}
7786

7887
@Override

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131

3232
import java.math.BigDecimal;
3333
import java.nio.ByteBuffer;
34+
import java.time.LocalDate;
3435
import java.time.LocalDateTime;
36+
import java.time.LocalTime;
3537
import java.time.OffsetDateTime;
3638
import java.util.List;
3739
import java.util.Map;
@@ -86,6 +88,14 @@ public short getShort(int pos) {
8688
@Override
8789
public int getInt(int pos) {
8890
Object value = icebergRecord.get(pos);
91+
// Iceberg returns LocalDate for DATE columns and LocalTime for TIME columns,
92+
// but Fluss InternalRow uses getInt() for both (epoch days and millis-of-day).
93+
if (value instanceof LocalDate) {
94+
return (int) ((LocalDate) value).toEpochDay();
95+
}
96+
if (value instanceof LocalTime) {
97+
return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000;
98+
}
8999
return (Integer) value;
90100
}
91101

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.fluss.lake.iceberg.source;
2020

21+
import org.apache.fluss.row.InternalArray;
2122
import org.apache.fluss.row.InternalRow;
2223

2324
import org.apache.iceberg.Schema;
@@ -29,9 +30,12 @@
2930

3031
import java.math.BigDecimal;
3132
import java.nio.ByteBuffer;
33+
import java.time.LocalDate;
3234
import java.time.LocalDateTime;
35+
import java.time.LocalTime;
3336
import java.time.OffsetDateTime;
3437
import java.time.ZoneOffset;
38+
import java.util.Arrays;
3539
import java.util.HashMap;
3640
import java.util.Map;
3741

@@ -87,7 +91,16 @@ void setUp() {
8791
"map_field",
8892
Types.MapType.ofOptional(
8993
20, 21, Types.StringType.get(), Types.IntegerType.get())),
90-
// System columns
94+
optional(25, "date_field", Types.DateType.get()),
95+
optional(26, "time_field", Types.TimeType.get()),
96+
optional(
97+
27,
98+
"date_array",
99+
Types.ListType.ofOptional(28, Types.DateType.get())),
100+
optional(
101+
29,
102+
"time_array",
103+
Types.ListType.ofOptional(30, Types.TimeType.get())),
91104
required(22, "__bucket", Types.IntegerType.get()),
92105
required(23, "__offset", Types.LongType.get()),
93106
required(24, "__timestamp", Types.TimestampType.withZone()));
@@ -107,8 +120,7 @@ void testGetFieldCount() {
107120

108121
icebergRecordAsFlussRow.replaceIcebergRecord(record);
109122

110-
// Should return count excluding system columns (3 system columns)
111-
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
123+
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19);
112124
}
113125

114126
@Test
@@ -170,8 +182,33 @@ void testAllDataTypes() {
170182
assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString())
171183
.isEqualTo("Hello"); // char_data
172184

173-
// Test field count (excluding system columns)
174-
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
185+
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19);
186+
}
187+
188+
@Test
189+
void testGetIntWithLocalDateAndLocalTime() {
190+
LocalDate date = LocalDate.of(2020, 6, 15);
191+
LocalTime time = LocalTime.of(14, 30, 0);
192+
193+
record.setField("id", 1L);
194+
record.setField("date_field", date);
195+
record.setField("time_field", time);
196+
record.setField("date_array", Arrays.asList(date));
197+
record.setField("time_array", Arrays.asList(time));
198+
record.setField("__bucket", 1);
199+
record.setField("__offset", 100L);
200+
record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
201+
202+
icebergRecordAsFlussRow.replaceIcebergRecord(record);
203+
204+
assertThat(icebergRecordAsFlussRow.getInt(15)).isEqualTo((int) date.toEpochDay());
205+
assertThat(icebergRecordAsFlussRow.getInt(16))
206+
.isEqualTo((int) time.toNanoOfDay() / 1_000_000);
207+
208+
InternalArray dateArray = icebergRecordAsFlussRow.getArray(17);
209+
InternalArray timeArray = icebergRecordAsFlussRow.getArray(18);
210+
assertThat(dateArray.getInt(0)).isEqualTo((int) date.toEpochDay());
211+
assertThat(timeArray.getInt(0)).isEqualTo((int) time.toNanoOfDay() / 1_000_000);
175212
}
176213

177214
@Test

0 commit comments

Comments
 (0)