Skip to content

Commit 36b01c5

Browse files
committed
fix time64 unit and add more test
1 parent b42bda5 commit 36b01c5

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed

src/iceberg/parquet/parquet_schema_util.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ Status ValidateParquetSchemaEvolution(
101101
break;
102102
case TypeId::kTime:
103103
if (arrow_type->id() == ::arrow::Type::TIME64) {
104-
return {};
104+
const auto& time_type =
105+
internal::checked_cast<const ::arrow::Time64Type&>(*arrow_type);
106+
if (time_type.unit() == ::arrow::TimeUnit::MICRO) {
107+
return {};
108+
}
105109
}
106110
break;
107111
case TypeId::kTimestamp:

test/parquet_schema_test.cc

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
#include <parquet/arrow/reader.h>
2222
#include <parquet/arrow/schema.h>
2323
#include <parquet/schema.h>
24+
#include <parquet/types.h>
2425

2526
#include "iceberg/metadata_columns.h"
2627
#include "iceberg/parquet/parquet_schema_util_internal.h"
2728
#include "iceberg/schema.h"
29+
#include "iceberg/type.h"
2830
#include "matchers.h"
2931

3032
namespace iceberg::parquet {
@@ -511,4 +513,123 @@ TEST(ParquetSchemaProjectionTest, ProjectDuplicateFieldIds) {
511513
ASSERT_THAT(projection_result, HasErrorMessage("Duplicate field id"));
512514
}
513515

516+
TEST(ParquetSchemaProjectionTest, ProjectPrimitiveType) {
517+
struct TestCase {
518+
std::shared_ptr<Type> iceberg_type;
519+
::parquet::Type::type parquet_type;
520+
std::shared_ptr<const ::parquet::LogicalType> parquet_logical_type;
521+
int32_t primitive_length = -1;
522+
};
523+
524+
std::vector<TestCase> test_cases = {
525+
TestCase{.iceberg_type = float64(), .parquet_type = ::parquet::Type::DOUBLE},
526+
TestCase{.iceberg_type = float32(), .parquet_type = ::parquet::Type::FLOAT},
527+
TestCase{.iceberg_type = int64(), .parquet_type = ::parquet::Type::INT64},
528+
TestCase{.iceberg_type = int32(), .parquet_type = ::parquet::Type::INT32},
529+
TestCase{.iceberg_type = string(),
530+
.parquet_type = ::parquet::Type::BYTE_ARRAY,
531+
.parquet_logical_type = ::parquet::LogicalType::String()},
532+
TestCase{.iceberg_type = binary(), .parquet_type = ::parquet::Type::BYTE_ARRAY},
533+
TestCase{.iceberg_type = boolean(), .parquet_type = ::parquet::Type::BOOLEAN},
534+
TestCase{.iceberg_type = date(),
535+
.parquet_type = ::parquet::Type::INT32,
536+
.parquet_logical_type = ::parquet::LogicalType::Date()},
537+
TestCase{
538+
.iceberg_type = time(),
539+
.parquet_type = ::parquet::Type::INT64,
540+
.parquet_logical_type = ::parquet::LogicalType::Time(
541+
/*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::MICROS)},
542+
TestCase{
543+
.iceberg_type = timestamp(),
544+
.parquet_type = ::parquet::Type::INT64,
545+
.parquet_logical_type = ::parquet::LogicalType::Timestamp(
546+
/*is_adjusted_to_utc=*/false, ::parquet::LogicalType::TimeUnit::MICROS)},
547+
TestCase{
548+
.iceberg_type = timestamp_tz(),
549+
.parquet_type = ::parquet::Type::INT64,
550+
.parquet_logical_type = ::parquet::LogicalType::Timestamp(
551+
/*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::MICROS)},
552+
TestCase{.iceberg_type = decimal(4, 2),
553+
.parquet_type = ::parquet::Type::INT32,
554+
.parquet_logical_type = ::parquet::LogicalType::Decimal(4, 2)},
555+
TestCase{.iceberg_type = decimal(38, 18),
556+
.parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
557+
.parquet_logical_type = ::parquet::LogicalType::Decimal(38, 18),
558+
.primitive_length = 16},
559+
TestCase{.iceberg_type = uuid(),
560+
.parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
561+
.parquet_logical_type = ::parquet::LogicalType::UUID(),
562+
.primitive_length = 16},
563+
TestCase{.iceberg_type = fixed(8),
564+
.parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
565+
.primitive_length = 8}};
566+
567+
for (const auto& test_case : test_cases) {
568+
Schema expected_schema({SchemaField::MakeRequired(/*field_id=*/1, "test_field",
569+
test_case.iceberg_type)});
570+
auto parquet_schema = MakeGroupNode(
571+
"iceberg_schema",
572+
{::parquet::schema::PrimitiveNode::Make(
573+
"test_field", ::parquet::Repetition::REQUIRED, test_case.parquet_logical_type,
574+
test_case.parquet_type, test_case.primitive_length,
575+
/*field_id=*/1)});
576+
577+
auto schema_manifest = MakeSchemaManifest(parquet_schema);
578+
auto projection_result = Project(expected_schema, schema_manifest);
579+
ASSERT_THAT(projection_result, IsOk());
580+
581+
const auto& projection = *projection_result;
582+
ASSERT_EQ(projection.fields.size(), 1);
583+
ASSERT_PROJECTED_FIELD(projection.fields[0], 0);
584+
}
585+
}
586+
587+
TEST(ParquetSchemaProjectionTest, UnsuportedProjection) {
588+
struct TestCase {
589+
std::shared_ptr<Type> iceberg_type;
590+
::parquet::Type::type parquet_type;
591+
std::shared_ptr<const ::parquet::LogicalType> parquet_logical_type;
592+
int32_t primitive_length = -1;
593+
};
594+
595+
std::vector<TestCase> test_cases = {
596+
TestCase{.iceberg_type = float32(), .parquet_type = ::parquet::Type::DOUBLE},
597+
TestCase{.iceberg_type = int32(), .parquet_type = ::parquet::Type::INT64},
598+
TestCase{.iceberg_type = date(), .parquet_type = ::parquet::Type::INT32},
599+
TestCase{.iceberg_type = time(),
600+
.parquet_type = ::parquet::Type::INT64,
601+
.parquet_logical_type = ::parquet::LogicalType::Time(
602+
/*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::NANOS)},
603+
TestCase{
604+
.iceberg_type = timestamp(),
605+
.parquet_type = ::parquet::Type::INT64,
606+
.parquet_logical_type = ::parquet::LogicalType::Timestamp(
607+
/*is_adjusted_to_utc=*/false, ::parquet::LogicalType::TimeUnit::NANOS)},
608+
TestCase{.iceberg_type = timestamp_tz(),
609+
.parquet_type = ::parquet::Type::INT64,
610+
.parquet_logical_type = ::parquet::LogicalType::Timestamp(
611+
/*is_adjusted_to_utc=*/true, ::parquet::LogicalType::TimeUnit::NANOS)},
612+
TestCase{.iceberg_type = decimal(4, 2),
613+
.parquet_type = ::parquet::Type::INT32,
614+
.parquet_logical_type = ::parquet::LogicalType::Decimal(4, 1)},
615+
TestCase{.iceberg_type = fixed(8),
616+
.parquet_type = ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
617+
.primitive_length = 4}};
618+
619+
for (const auto& test_case : test_cases) {
620+
Schema expected_schema({SchemaField::MakeRequired(/*field_id=*/1, "test_field",
621+
test_case.iceberg_type)});
622+
auto parquet_schema = MakeGroupNode(
623+
"iceberg_schema",
624+
{::parquet::schema::PrimitiveNode::Make(
625+
"test_field", ::parquet::Repetition::REQUIRED, test_case.parquet_logical_type,
626+
test_case.parquet_type, test_case.primitive_length,
627+
/*field_id=*/1)});
628+
629+
auto schema_manifest = MakeSchemaManifest(parquet_schema);
630+
auto projection_result = Project(expected_schema, schema_manifest);
631+
ASSERT_THAT(projection_result, HasErrorMessage("Cannot read"));
632+
}
633+
}
634+
514635
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)