Skip to content
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cba31c7
Support decimal32/64 in schema conversion
curioustien Jan 19, 2025
a9398a2
Support decimal32/64 in column writer
curioustien Jan 19, 2025
e1dc023
Restrict column writer with correct decimal types
curioustien Jan 19, 2025
6032b02
Support decimal32/64 in reader & vector kernels & tests
curioustien Jan 19, 2025
290de24
Pyarrow parquet to pandas
curioustien Jan 26, 2025
e5b996e
Address comments
curioustien Feb 15, 2025
44f1adc
Add more tests in arrow_schema_test
curioustien Feb 15, 2025
c017323
Add more tests in arrow_reader_writer_test
curioustien Feb 16, 2025
63d307b
Add more typed tests for small decimals
curioustien Feb 16, 2025
77dd7d3
Document new flag
curioustien Feb 16, 2025
d81cf13
Add decimal32/64 list type support arrow to pandas
curioustien Feb 16, 2025
424472f
Support smallest_decimal_enabled flag in pyarrow
curioustien Feb 16, 2025
d1687a7
Revert writer schema manifest arg passing change
curioustien Mar 9, 2025
1f0fb7b
Merge remote-tracking branch 'upstream/main' into parquet-decimal-test
curioustien Mar 22, 2025
52711d5
Fix lint
curioustien Mar 22, 2025
f64d6d9
Remove extra doc
curioustien Mar 22, 2025
3fb307e
Revert FileReader changes
curioustien Mar 29, 2025
f279349
Delay scratch buffer pointer cast
curioustien Mar 29, 2025
8a78c72
Use ArrowReaderProperties
curioustien Mar 29, 2025
29e98ff
Merge remote-tracking branch 'upstream/main' into parquet-decimal-test
curioustien Mar 29, 2025
d2e1ffa
Revert "Delay scratch buffer pointer cast"
curioustien Apr 4, 2025
a8304f3
Remove mistake include
curioustien Apr 4, 2025
de295e3
Merge remote-tracking branch 'upstream/main' into parquet-decimal-test
curioustien Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ KernelInit GetHashInit(Type::type type_id) {
case Type::DATE32:
case Type::TIME32:
case Type::INTERVAL_MONTHS:
case Type::DECIMAL32:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes required to the compute kernels required to support Parquet? I can't see why but I might be missing something. Otherwise, we should move adding support for decimal32 and decimal64 to those compute kernels on a different PR and leave this one only with the required parquet changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see now, on the description says this is required for some tests:
Allow decimal32/64 in Arrow compute vector hash which is needed for some of the existing Parquet tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm down to split this change to another PR which can cover this support with more tests on the arrow compute side. But yes, there are a few tests in Parquet that hit arrow vector kernel code path

return HashInit<RegularHashKernel<UInt32Type, Action>>;
case Type::INT64:
case Type::UINT64:
Expand All @@ -564,6 +565,7 @@ KernelInit GetHashInit(Type::type type_id) {
case Type::TIMESTAMP:
case Type::DURATION:
case Type::INTERVAL_DAY_TIME:
case Type::DECIMAL64:
return HashInit<RegularHashKernel<UInt64Type, Action>>;
case Type::BINARY:
case Type::STRING:
Expand Down Expand Up @@ -707,7 +709,7 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty)
DCHECK_OK(func->AddKernel(base));
}

for (auto t : {Type::DECIMAL128, Type::DECIMAL256}) {
for (auto t : {Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) {
base.init = GetHashInit<Action>(t);
base.signature = KernelSignature::Make({t}, out_ty);
DCHECK_OK(func->AddKernel(base));
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ std::shared_ptr<VectorFunction> MakeIndicesNonZeroFunction(std::string name,
AddKernels(NumericTypes());
AddKernels({boolean()});

for (const auto& ty : {Type::DECIMAL128, Type::DECIMAL256}) {
for (const auto& ty :
{Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) {
kernel.signature = KernelSignature::Make({ty}, uint64());
DCHECK_OK(func->AddKernel(kernel));
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
parquet_scan_options.arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options.arrow_reader_properties->io_context());
arrow_properties.set_smallest_decimal_enabled(
parquet_scan_options.arrow_reader_properties->smallest_decimal_enabled());
arrow_properties.set_use_threads(options.use_threads);
return arrow_properties;
}
Expand Down
263 changes: 207 additions & 56 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Large diffs are not rendered by default.

119 changes: 119 additions & 0 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,43 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetAnnotatedFieldsSmallestDecimal) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<const LogicalType> logical_type;
parquet::Type::type physical_type;
int physical_length;
std::shared_ptr<::arrow::DataType> datatype;
};

std::vector<FieldConstructionArguments> cases = {
{"decimal(8, 2)", LogicalType::Decimal(8, 2), ParquetType::INT32, -1,
::arrow::decimal32(8, 2)},
{"decimal(16, 4)", LogicalType::Decimal(16, 4), ParquetType::INT64, -1,
::arrow::decimal64(16, 4)},
{"decimal(32, 8)", LogicalType::Decimal(32, 8), ParquetType::FIXED_LEN_BYTE_ARRAY,
16, ::arrow::decimal128(32, 8)},
{"decimal(73, 38)", LogicalType::Decimal(73, 38), ParquetType::FIXED_LEN_BYTE_ARRAY,
31, ::arrow::decimal256(73, 38)},
};

std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

for (const FieldConstructionArguments& c : cases) {
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::OPTIONAL,
c.logical_type, c.physical_type,
c.physical_length));
arrow_fields.push_back(::arrow::field(c.name, c.datatype));
}

auto reader_props = ArrowReaderProperties();
reader_props.set_smallest_decimal_enabled(true);
ASSERT_OK(ConvertSchema(parquet_fields, nullptr, reader_props));
auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -354,6 +391,42 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetSmallestDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
ParquetType::FIXED_LEN_BYTE_ARRAY,
ConvertedType::DECIMAL, 4, 8, 4));
arrow_fields.push_back(
::arrow::field("flba-decimal", std::make_shared<::arrow::Decimal32Type>(8, 4)));

parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY,
ConvertedType::DECIMAL, -1, 18, 4));
arrow_fields.push_back(
::arrow::field("binary-decimal", std::make_shared<::arrow::Decimal64Type>(18, 4)));

parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
ParquetType::INT32, ConvertedType::DECIMAL,
-1, 38, 4));
arrow_fields.push_back(
::arrow::field("int32-decimal", std::make_shared<::arrow::Decimal128Type>(38, 4)));

parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
ParquetType::INT64, ConvertedType::DECIMAL,
-1, 48, 4));
arrow_fields.push_back(
::arrow::field("int64-decimal", std::make_shared<::arrow::Decimal256Type>(48, 4)));

auto arrow_schema = ::arrow::schema(arrow_fields);
auto reader_props = ArrowReaderProperties();
reader_props.set_smallest_decimal_enabled(true);
ASSERT_OK(ConvertSchema(parquet_fields, nullptr, reader_props));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetMaps) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -1134,6 +1207,52 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
// ASSERT_NO_FATAL_FAILURE();
}

TEST_F(TestConvertArrowSchema, ArrowFieldsStoreSchema) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
std::shared_ptr<const LogicalType> logical_type;
parquet::Type::type physical_type;
int physical_length;
};

std::vector<FieldConstructionArguments> cases = {
{"decimal(1, 0)", ::arrow::decimal128(1, 0), LogicalType::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal128(8, 2), LogicalType::Decimal(8, 2),
ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
{"decimal(16, 4)", ::arrow::decimal128(16, 4), LogicalType::Decimal(16, 4),
ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
{"decimal(32, 8)", ::arrow::decimal128(32, 8), LogicalType::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
{"decimal(1, 0)", ::arrow::decimal32(1, 0), LogicalType::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal32(8, 2), LogicalType::Decimal(8, 2),
ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
{"decimal(16, 4)", ::arrow::decimal64(16, 4), LogicalType::Decimal(16, 4),
ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
{"decimal(32, 8)", ::arrow::decimal128(32, 8), LogicalType::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
{"decimal(73, 38)", ::arrow::decimal256(73, 38), LogicalType::Decimal(73, 38),
ParquetType::FIXED_LEN_BYTE_ARRAY, 31}};

std::vector<std::shared_ptr<Field>> arrow_fields;
std::vector<NodePtr> parquet_fields;

for (const FieldConstructionArguments& c : cases) {
arrow_fields.push_back(::arrow::field(c.name, c.datatype, false));
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED,
c.logical_type, c.physical_type,
c.physical_length));
}

auto writer_props = ::parquet::default_arrow_writer_properties();
writer_props->store_schema();
ASSERT_OK(ConvertSchema(arrow_fields, writer_props));
CheckFlatSchema(parquet_fields);
// ASSERT_NO_FATAL_FAILURE();
}

TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
struct FieldConstructionArguments {
std::string name;
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ class FileReaderImpl : public FileReader {
reader_properties_.set_batch_size(batch_size);
}

void set_smallest_decimal_enabled(bool smallest_decimal_enabled) override {
reader_properties_.set_smallest_decimal_enabled(smallest_decimal_enabled);
}

const ArrowReaderProperties& properties() const override { return reader_properties_; }

const SchemaManifest& manifest() const override { return manifest_; }
Expand Down Expand Up @@ -1401,10 +1405,11 @@ Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool*
}

Result<std::unique_ptr<FileReader>> OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool) {
std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
const ArrowReaderProperties& reader_properties) {
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(std::move(file)));
return builder.memory_pool(pool)->Build();
return builder.memory_pool(pool)->properties(reader_properties)->Build();
}

namespace internal {
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ class PARQUET_EXPORT FileReader {
/// Set number of records to read per batch for the RecordBatchReader.
virtual void set_batch_size(int64_t batch_size) = 0;

/// Set whether to enable smallest decimal arrow type
virtual void set_smallest_decimal_enabled(bool smallest_decimal_enabled) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks weird. We cannot change it after the reader has been created. Isn't it accessible via the ArrowReaderProperties?


virtual const ArrowReaderProperties& properties() const = 0;

virtual const SchemaManifest& manifest() const = 0;
Expand Down Expand Up @@ -403,7 +406,8 @@ ::arrow::Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile>,
/// Advanced settings are supported through the FileReaderBuilder class.
PARQUET_EXPORT
::arrow::Result<std::unique_ptr<FileReader>> OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* allocator);
std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* pool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also revert this change.

const ArrowReaderProperties& reader_properties = default_arrow_reader_properties());

/// @}

Expand Down
Loading
Loading