Skip to content

Commit 55ad441

Browse files
Added wide dates and decimals support for s3:parquet (#25186) (#26076)
2 parents a777fcf + 4848bc5 commit 55ad441

File tree

7 files changed

+498
-46
lines changed

7 files changed

+498
-46
lines changed

ydb/library/yql/providers/s3/actors/ya.make

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@ PEERDIR(
2929
ydb/core/base
3030
ydb/core/fq/libs/events
3131
ydb/library/yql/dq/actors/compute
32-
yql/essentials/minikql/computation
3332
ydb/library/yql/providers/common/arrow
3433
ydb/library/yql/providers/common/arrow/interface
3534
ydb/library/yql/providers/common/http_gateway
36-
yql/essentials/providers/common/schema/mkql
3735
ydb/library/yql/providers/common/token_accessor/client
3836
ydb/library/yql/providers/generic/pushdown
3937
ydb/library/yql/providers/s3/actors_factory
@@ -44,9 +42,12 @@ PEERDIR(
4442
ydb/library/yql/providers/s3/object_listers
4543
ydb/library/yql/providers/s3/proto
4644
ydb/library/yql/providers/s3/range_helpers
45+
ydb/library/yql/udfs/common/clickhouse/client
46+
yql/essentials/minikql
47+
yql/essentials/minikql/computation
48+
yql/essentials/providers/common/schema/mkql
4749
yql/essentials/public/issue
4850
yql/essentials/public/types
49-
ydb/library/yql/udfs/common/clickhouse/client
5051
yql/essentials/utils
5152
)
5253

ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <yql/essentials/public/udf/arrow/block_builder.h>
1313
#include <yql/essentials/public/udf/arrow/block_item.h>
1414
#include <yql/essentials/public/udf/arrow/block_reader.h>
15+
#include <yql/essentials/public/udf/udf_data_type.h>
1516
#include <yql/essentials/utils/yql_panic.h>
1617

1718
#include <arrow/api.h>
@@ -89,9 +90,9 @@ ui32 GetMultiplierForDatetime(arrow::TimeUnit::type unit) {
8990
case arrow::TimeUnit::SECOND:
9091
return 1;
9192
case arrow::TimeUnit::MILLI:
92-
throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the datetime");
93+
return 1000;
9394
case arrow::TimeUnit::MICRO:
94-
throw parquet::ParquetException(TStringBuilder() << "microsecond accuracy does not fit into the datetime");
95+
return 1000000;
9596
case arrow::TimeUnit::NANO:
9697
throw parquet::ParquetException(TStringBuilder() << "nanosecond accuracy does not fit into the datetime");
9798
}
@@ -127,6 +128,35 @@ std::shared_ptr<arrow::Array> ArrowTypeAsYqlDatetime(const std::shared_ptr<arrow
127128
return builder.Build(true).make_array();
128129
}
129130

131+
template <bool isOptional, typename TArrowType>
132+
std::shared_ptr<arrow::Array> ArrowTimestampAsYqlDatetime(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, ui32 multiplier) {
133+
::NYql::NUdf::TFixedSizeArrayBuilder<TArrowType, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
134+
::NYql::NUdf::TFixedSizeBlockReader<i64, isOptional> reader;
135+
for (i64 i = 0; i < value->length(); ++i) {
136+
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
137+
if constexpr (isOptional) {
138+
if (!item) {
139+
builder.Add(item);
140+
continue;
141+
}
142+
} else if (!item) {
143+
throw parquet::ParquetException(TStringBuilder() << "null value for datetime could not be represented in non-optional type");
144+
}
145+
146+
const i64 baseValue = item.As<i64>();
147+
if (baseValue < 0 && baseValue > static_cast<int64_t>(::NYql::NUdf::MAX_DATETIME)) {
148+
throw parquet::ParquetException(TStringBuilder() << "datetime in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATETIME << "]: " << baseValue);
149+
}
150+
151+
if (baseValue % multiplier) {
152+
throw parquet::ParquetException(TStringBuilder() << "datetime in parquet should have integer amount of seconds, have: " << baseValue * 1.0 / multiplier);
153+
}
154+
const TArrowType v = baseValue / static_cast<ui64>(multiplier);
155+
builder.Add(NUdf::TBlockItem(static_cast<TArrowType>(v)));
156+
}
157+
return builder.Build(true).make_array();
158+
}
159+
130160
template <bool isOptional>
131161
std::shared_ptr<arrow::Array> ArrowStringAsYqlDateTime(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
132162
::NYql::NUdf::TFixedSizeArrayBuilder<ui32, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
@@ -330,8 +360,16 @@ TColumnConverter ArrowDate64AsYqlDatetime(const std::shared_ptr<arrow::DataType>
330360
TColumnConverter ArrowTimestampAsYqlDatetime(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) {
331361
return [targetType, isOptional, multiplier = GetMultiplierForDatetime(timeUnit)](const std::shared_ptr<arrow::Array>& value) {
332362
return isOptional
333-
? ArrowTypeAsYqlDatetime<true, i64>(targetType, value, multiplier)
334-
: ArrowTypeAsYqlDatetime<false, i64>(targetType, value, multiplier);
363+
? ArrowTimestampAsYqlDatetime<true, ui32>(targetType, value, multiplier)
364+
: ArrowTimestampAsYqlDatetime<false, ui32>(targetType, value, multiplier);
365+
};
366+
}
367+
368+
TColumnConverter ArrowTimestampAsYqlDatetime64(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) {
369+
return [targetType, isOptional, multiplier = GetMultiplierForDatetime(timeUnit)](const std::shared_ptr<arrow::Array>& value) {
370+
return isOptional
371+
? ArrowTimestampAsYqlDatetime<true, i64>(targetType, value, multiplier)
372+
: ArrowTimestampAsYqlDatetime<false, i64>(targetType, value, multiplier);
335373
};
336374
}
337375

@@ -624,6 +662,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
624662
switch (slotItem) {
625663
case NUdf::EDataSlot::Datetime:
626664
return ArrowTimestampAsYqlDatetime(targetType, isOptional, timestampType.unit());
665+
case NUdf::EDataSlot::Datetime64:
666+
return ArrowTimestampAsYqlDatetime64(targetType, isOptional, timestampType.unit());
627667
case NUdf::EDataSlot::Timestamp:
628668
return ArrowTimestampAsYqlTimestamp(targetType, isOptional, timestampType.unit());
629669
case NUdf::EDataSlot::String:
@@ -654,7 +694,7 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
654694
) {
655695
return [](const std::shared_ptr<arrow::Array>& value) {
656696
auto decimals = std::static_pointer_cast<arrow::Decimal128Array>(value);
657-
auto output = std::make_shared<arrow::FixedSizeBinaryArray>(arrow::fixed_size_binary(16), decimals->length(), decimals->values());
697+
auto output = std::make_shared<arrow::FixedSizeBinaryArray>(arrow::fixed_size_binary(16), decimals->length(), decimals->values(), decimals->null_bitmap(), decimals->null_count());
658698
return output;
659699
};
660700
}
@@ -691,6 +731,39 @@ TColumnConverter YqlBlockTzDateToArrow(const std::string& columnName, const std:
691731
};
692732
}
693733

734+
template <bool isOptional>
735+
TColumnConverter DecimalToArrowBaseConverter(const std::shared_ptr<arrow::DataType>& targetType) {
736+
return [targetType](const std::shared_ptr<arrow::Array>& value) {
737+
arrow::Decimal128Builder builder(targetType, arrow::default_memory_pool());
738+
::NYql::NUdf::TFixedSizeBlockReader<NYql::NDecimal::TInt128, isOptional> reader;
739+
740+
for (i64 i = 0; i < value->length(); ++i) {
741+
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
742+
743+
if (!item) {
744+
THROW_ARROW_NOT_OK(builder.AppendNull());
745+
continue;
746+
}
747+
748+
NYql::NDecimal::TInt128 val = item.GetInt128();
749+
arrow::Decimal128 newValue((uint8_t*)(&val));
750+
THROW_ARROW_NOT_OK(builder.Append(newValue));
751+
}
752+
753+
std::shared_ptr<arrow::Array> array;
754+
THROW_ARROW_NOT_OK(builder.Finish(&array));
755+
return array;
756+
};
757+
}
758+
759+
TColumnConverter DecimalToArrowConverter(bool isOptional, const std::shared_ptr<arrow::DataType>& targetType) {
760+
if (isOptional) {
761+
return DecimalToArrowBaseConverter<true>(targetType);
762+
} else {
763+
return DecimalToArrowBaseConverter<false>(targetType);
764+
}
765+
}
766+
694767
}
695768

696769
namespace NYql::NDq {
@@ -728,7 +801,9 @@ TColumnConverter BuildOutputColumnConverter(const std::string& columnName, NKiki
728801
YQL_ENSURE(ConvertArrowType(columnType, yqlArrowType), "Got unsupported yql block type: " << *columnType << " in column " << columnName);
729802
YQL_ENSURE(S3ConvertArrowOutputType(columnType, s3OutputType), "Got unsupported s3 output block type: " << *columnType << " in column " << columnName);
730803

731-
if (columnType->IsOptional()) {
804+
bool isOptional = columnType->IsOptional();
805+
806+
if (isOptional) {
732807
columnType = AS_TYPE(TOptionalType, columnType)->GetItemType();
733808
}
734809
YQL_ENSURE(columnType->IsData(), "Allowed only data types for S3 output, but got: " << *columnType << " in column " << columnName);
@@ -752,13 +827,18 @@ TColumnConverter BuildOutputColumnConverter(const std::string& columnName, NKiki
752827
case NUdf::EDataSlot::Datetime:
753828
case NUdf::EDataSlot::Timestamp:
754829
return {};
830+
case NUdf::EDataSlot::Date32:
831+
case NUdf::EDataSlot::Datetime64:
832+
case NUdf::EDataSlot::Timestamp64:
755833
case NUdf::EDataSlot::Utf8:
756834
case NUdf::EDataSlot::Json:
757835
return ArrowComputeConvertor(columnName, yqlArrowType, s3OutputType);
758836
case NUdf::EDataSlot::TzDate:
759837
case NUdf::EDataSlot::TzDatetime:
760838
case NUdf::EDataSlot::TzTimestamp:
761839
return YqlBlockTzDateToArrow(columnName, yqlArrowType);
840+
case NUdf::EDataSlot::Decimal:
841+
return DecimalToArrowConverter(isOptional, s3OutputType);
762842
default:
763843
YQL_ENSURE(false, "Got unsupported s3 output block type: " << *columnType << " in column " << columnName);
764844
}
@@ -815,7 +895,7 @@ std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::R
815895
}
816896

817897
// Type conversion same as in ClickHouseClient.SerializeFormat udf
818-
bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type) {
898+
bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type, TType* itemType) {
819899
switch (slot) {
820900
case NUdf::EDataSlot::Int8:
821901
type = arrow::int8();
@@ -835,11 +915,17 @@ bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataT
835915
case NUdf::EDataSlot::Int32:
836916
type = arrow::int32();
837917
return true;
918+
case NUdf::EDataSlot::Date32:
919+
type = arrow::date32();
920+
return true;
838921
case NUdf::EDataSlot::Datetime:
839922
case NUdf::EDataSlot::TzDatetime:
840923
case NUdf::EDataSlot::Uint32:
841924
type = arrow::uint32();
842925
return true;
926+
case NUdf::EDataSlot::Datetime64:
927+
type = arrow::timestamp(arrow::TimeUnit::SECOND, "UTC");
928+
return true;
843929
case NUdf::EDataSlot::Int64:
844930
type = arrow::int64();
845931
return true;
@@ -857,7 +943,17 @@ bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataT
857943
case NUdf::EDataSlot::Json:
858944
type = arrow::binary();
859945
return true;
946+
case NUdf::EDataSlot::Decimal: {
947+
if (itemType) {
948+
auto [precision, scale] = static_cast<TDataDecimalType*>(itemType)->GetParams();
949+
type = arrow::decimal128(precision, scale);
950+
} else {
951+
type = arrow::decimal128(22, 9);
952+
}
953+
return true;
954+
}
860955
case NUdf::EDataSlot::Timestamp:
956+
case NUdf::EDataSlot::Timestamp64:
861957
case NUdf::EDataSlot::TzTimestamp:
862958
type = arrow::timestamp(arrow::TimeUnit::MICRO, "UTC");
863959
return true;
@@ -880,7 +976,7 @@ bool S3ConvertArrowOutputType(TType* itemType, std::shared_ptr<arrow::DataType>&
880976
return false;
881977
}
882978

883-
return S3ConvertArrowOutputType(*slot, type);
979+
return S3ConvertArrowOutputType(*slot, type, itemType);
884980
}
885981

886982
void BuildOutputColumnConverters(const NKikimr::NMiniKQL::TStructType* outputStructType, std::vector<TColumnConverter>& columnConverters) {

ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(
3535
std::shared_ptr<arrow::RecordBatch> batch,
3636
std::vector<TColumnConverter>& columnConverters);
3737

38-
bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type);
38+
bool S3ConvertArrowOutputType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type, NKikimr::NMiniKQL::TType* itemType = nullptr);
3939
bool S3ConvertArrowOutputType(NKikimr::NMiniKQL::TType* itemType, std::shared_ptr<arrow::DataType>& type);
4040

4141
void BuildOutputColumnConverters(

ydb/library/yql/providers/s3/common/util.cpp

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ struct TTypeError {
3535
TString Error = "unsupported type";
3636
};
3737

38+
std::optional<TTypeError> ValidateIoDataType(const TDataExprType* type, std::vector<EDataSlot> extraTypes = {}) {
39+
const auto dataSlot = type->GetSlot();
40+
if (IsDataTypeBigDate(dataSlot)) {
41+
return TTypeError{type, "big dates is not supported"};
42+
}
43+
if (IsDataTypeNumeric(dataSlot) || IsDataTypeDateOrTzDate(dataSlot)) {
44+
return std::nullopt;
45+
}
46+
if (IsIn({EDataSlot::Bool, EDataSlot::String, EDataSlot::Utf8, EDataSlot::Json, EDataSlot::Uuid}, dataSlot) || IsIn(extraTypes, dataSlot)) {
47+
return std::nullopt;
48+
}
49+
return TTypeError{type};
50+
}
51+
3852
std::optional<TTypeError> ValidateJsonListIoType(const TTypeAnnotationNode* type, std::function<std::optional<TTypeError>(const TTypeAnnotationNode*)> defaultHandler) {
3953
switch (type->GetKind()) {
4054
case ETypeAnnotationKind::Null:
@@ -103,6 +117,51 @@ std::optional<TTypeError> ValidateJsonListInputType(const TTypeAnnotationNode* t
103117
});
104118
}
105119

120+
std::optional<TTypeError> ValidateParquetIoType(const TTypeAnnotationNode* type, bool underOptional = false) {
121+
switch (type->GetKind()) {
122+
case ETypeAnnotationKind::Data: {
123+
const auto dataSlot = type->Cast<TDataExprType>()->GetSlot();
124+
if (IsDataTypeNumeric(dataSlot) || IsDataTypeDateOrTzDate(dataSlot) || IsDataTypeDecimal(dataSlot) || IsDataTypeBigDate(dataSlot)) {
125+
return std::nullopt;
126+
}
127+
if (IsIn({EDataSlot::Bool, EDataSlot::String, EDataSlot::Utf8, EDataSlot::Json, EDataSlot::Uuid}, dataSlot)) {
128+
return std::nullopt;
129+
}
130+
return TTypeError{type};
131+
}
132+
case ETypeAnnotationKind::Optional: {
133+
if (underOptional) {
134+
return TTypeError{type, "double optional is not supported"};
135+
}
136+
return ValidateParquetIoType(type->Cast<TOptionalExprType>()->GetItemType(), true);
137+
}
138+
case ETypeAnnotationKind::List: {
139+
if (underOptional) {
140+
return TTypeError{type, "list under optional is not supported"};
141+
}
142+
return ValidateIoDataType(type->Cast<TDataExprType>());
143+
}
144+
case ETypeAnnotationKind::Tuple: {
145+
if (underOptional) {
146+
return TTypeError{type, "tuple under optional is not supported"};
147+
}
148+
for (const auto* item : type->Cast<TTupleExprType>()->GetItems()) {
149+
if (const auto error = ValidateIoDataType(item->Cast<TDataExprType>())) {
150+
return error;
151+
}
152+
}
153+
return std::nullopt;
154+
}
155+
case ETypeAnnotationKind::Pg: {
156+
return std::nullopt;
157+
}
158+
default: {
159+
break;
160+
}
161+
}
162+
return TTypeError{type};
163+
}
164+
106165
// Type compatible with Yson2.From udf
107166
std::optional<TTypeError> DefaultJsonListOutputTypeHandler(const TTypeAnnotationNode* type) {
108167
if (type->GetKind() == ETypeAnnotationKind::Variant) {
@@ -115,18 +174,8 @@ std::optional<TTypeError> ValidateJsonListOutputType(const TTypeAnnotationNode*
115174
return ValidateJsonListIoType(type, &DefaultJsonListOutputTypeHandler);
116175
}
117176

118-
std::optional<TTypeError> ValidateIoDataType(const TDataExprType* type, std::vector<EDataSlot> extraTypes = {}) {
119-
const auto dataSlot = type->GetSlot();
120-
if (IsDataTypeBigDate(dataSlot)) {
121-
return TTypeError{type, "big dates is not supported"};
122-
}
123-
if (IsDataTypeNumeric(dataSlot) || IsDataTypeDateOrTzDate(dataSlot)) {
124-
return std::nullopt;
125-
}
126-
if (IsIn({EDataSlot::Bool, EDataSlot::String, EDataSlot::Utf8, EDataSlot::Json, EDataSlot::Uuid}, dataSlot) || IsIn(extraTypes, dataSlot)) {
127-
return std::nullopt;
128-
}
129-
return TTypeError{type};
177+
std::optional<TTypeError> ValidateParquetOutputType(const TTypeAnnotationNode* type) {
178+
return ValidateParquetIoType(type);
130179
}
131180

132181
// Data type compatible with ClickHouseClient.ParseBlocks udf and S3 coro read actor
@@ -247,6 +296,10 @@ bool ValidateS3WriteSchema(TPositionHandle pos, std::string_view format, const T
247296
return ValidateIoSchema(pos, schemaStructRowType, "S3 json_list output format", ctx, &ValidateJsonListOutputType);
248297
}
249298

299+
if (format == "parquet"sv) {
300+
return ValidateIoSchema(pos, schemaStructRowType, "S3 parquet output format", ctx, &ValidateParquetOutputType);
301+
}
302+
250303
return ValidateIoSchema(pos, schemaStructRowType, TStringBuilder() << "S3 " << format << " output format", ctx, [](const TTypeAnnotationNode* type) {
251304
return ValidateGenericIoType(type, &ValidateClickHouseUdfDataType);
252305
});

0 commit comments

Comments
 (0)