Skip to content

Commit dc90f2e

Browse files
committed
feat: transform function
1 parent d05a9b2 commit dc90f2e

File tree

12 files changed

+812
-65
lines changed

12 files changed

+812
-65
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ set(ICEBERG_SOURCES
2626
partition_field.cc
2727
partition_spec.cc
2828
transform.cc
29+
transform/transform_function.cc
30+
transform/transform_spec.cc
2931
type.cc)
3032

3133
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/arrow_c_data.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,44 @@ struct ArrowArray {
7373

7474
#endif // ARROW_C_DATA_INTERFACE
7575

76+
#ifndef ARROW_C_STREAM_INTERFACE
77+
# define ARROW_C_STREAM_INTERFACE
78+
79+
struct ArrowArrayStream {
80+
// Callback to get the stream type
81+
// (will be the same for all arrays in the stream).
82+
//
83+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
84+
//
85+
// If successful, the ArrowSchema must be released independently from the stream.
86+
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
87+
88+
// Callback to get the next array
89+
// (if no error and the array is released, the stream has ended)
90+
//
91+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
92+
//
93+
// If successful, the ArrowArray must be released independently from the stream.
94+
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
95+
96+
// Callback to get optional detailed error information.
97+
// This must only be called if the last stream operation failed
98+
// with a non-0 return code.
99+
//
100+
// Return value: pointer to a null-terminated character array describing
101+
// the last error, or NULL if no description is available.
102+
//
103+
// The returned pointer is only valid until the next operation on this stream
104+
// (including release).
105+
const char* (*get_last_error)(struct ArrowArrayStream*);
106+
107+
// Release callback: release the stream's own resources.
108+
// Note that arrays returned by `get_next` must be individually released.
109+
void (*release)(struct ArrowArrayStream*);
110+
// Opaque producer-specific data
111+
void* private_data;
112+
};
113+
114+
#endif // ARROW_C_STREAM_INTERFACE
115+
76116
} // extern "C"

src/iceberg/transform.cc

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,55 +21,98 @@
2121

2222
#include <format>
2323

24-
namespace iceberg {
24+
#include <nanoarrow/nanoarrow.hpp>
25+
26+
#include "iceberg/transform/transform_function.h"
27+
#include "iceberg/transform/transform_spec.h"
28+
#include "iceberg/type.h"
2529

30+
namespace iceberg {
2631
namespace {
27-
/// \brief Get the relative transform name
28-
constexpr std::string_view ToString(TransformType type) {
29-
switch (type) {
30-
case TransformType::kUnknown:
31-
return "unknown";
32-
case TransformType::kIdentity:
33-
return "identity";
34-
case TransformType::kBucket:
35-
return "bucket";
36-
case TransformType::kTruncate:
37-
return "truncate";
38-
case TransformType::kYear:
39-
return "year";
40-
case TransformType::kMonth:
41-
return "month";
42-
case TransformType::kDay:
43-
return "day";
44-
case TransformType::kHour:
45-
return "hour";
46-
case TransformType::kVoid:
47-
return "void";
48-
default:
49-
return "invalid";
50-
}
32+
33+
int32_t GetInt32FromParamArray(ArrowArray const& param_array) {
34+
ArrowArrayView view;
35+
ArrowArrayViewInitFromType(&view, NANOARROW_TYPE_INT32);
36+
NANOARROW_THROW_NOT_OK(ArrowArrayViewSetArray(&view, &param_array, nullptr));
37+
const auto value = view.buffer_views[1].data.as_int32[0];
38+
ArrowArrayViewReset(&view);
39+
return value;
5140
}
41+
5242
} // namespace
5343

54-
TransformFunction::TransformFunction(TransformType type) : transform_type_(type) {}
44+
TransformFunction::TransformFunction(TransformType transform_type,
45+
std::shared_ptr<Type> source_type)
46+
: transform_type_(transform_type), source_type_(std::move(source_type)) {}
5547

5648
TransformType TransformFunction::transform_type() const { return transform_type_; }
5749

50+
std::shared_ptr<Type> const& TransformFunction::source_type() const {
51+
return source_type_;
52+
}
53+
5854
std::string TransformFunction::ToString() const {
5955
return std::format("{}", iceberg::ToString(transform_type_));
6056
}
6157

6258
bool TransformFunction::Equals(const TransformFunction& other) const {
63-
return transform_type_ == other.transform_type_;
59+
return transform_type_ == other.transform_type_ && *source_type_ == *other.source_type_;
6460
}
6561

66-
IdentityTransformFunction::IdentityTransformFunction()
67-
: TransformFunction(TransformType::kIdentity) {}
68-
69-
expected<ArrowArray, Error> IdentityTransformFunction::Transform(
70-
const ArrowArray& input) {
71-
return unexpected<Error>({.kind = ErrorKind::kNotSupported,
72-
.message = "IdentityTransformFunction::Transform"});
62+
expected<std::unique_ptr<TransformFunction>, Error> TransformFunction::Make(
63+
const TransformSpec& spec) {
64+
switch (spec.transform_type) {
65+
case TransformType::kIdentity:
66+
return std::make_unique<IdentityTransform>(spec.source_type);
67+
case TransformType::kBucket: {
68+
if (!spec.params_opt.has_value()) {
69+
return unexpected<Error>(
70+
{.kind = ErrorKind::kInvalidArgument,
71+
.message = "Bucket transform requires 1 parameter (number of buckets), but "
72+
"none were provided."});
73+
}
74+
if (spec.params_opt->length != 1) {
75+
return unexpected<Error>(
76+
{.kind = ErrorKind::kInvalidArgument,
77+
.message = std::format("Bucket transform expects exactly 1 parameter "
78+
"(number of buckets), but got {}.",
79+
spec.params_opt->length)});
80+
}
81+
auto num_buckets = GetInt32FromParamArray(spec.params_opt.value());
82+
return std::make_unique<BucketTransform>(spec.source_type, num_buckets);
83+
}
84+
case TransformType::kTruncate: {
85+
if (!spec.params_opt.has_value()) {
86+
return unexpected<Error>({.kind = ErrorKind::kInvalidArgument,
87+
.message = "Truncate transform requires 1 parameter "
88+
"(width), but none were provided."});
89+
}
90+
if (spec.params_opt->length != 1) {
91+
return unexpected<Error>(
92+
{.kind = ErrorKind::kInvalidArgument,
93+
.message = std::format(
94+
"Truncate transform expects exactly 1 parameter (width), but got {}.",
95+
spec.params_opt->length)});
96+
}
97+
auto width = GetInt32FromParamArray(spec.params_opt.value());
98+
return std::make_unique<TruncateTransform>(spec.source_type, width);
99+
}
100+
case TransformType::kYear:
101+
return std::make_unique<YearTransform>(spec.source_type);
102+
case TransformType::kMonth:
103+
return std::make_unique<MonthTransform>(spec.source_type);
104+
case TransformType::kDay:
105+
return std::make_unique<DayTransform>(spec.source_type);
106+
case TransformType::kHour:
107+
return std::make_unique<HourTransform>(spec.source_type);
108+
case TransformType::kVoid:
109+
return std::make_unique<VoidTransform>(spec.source_type);
110+
default:
111+
return unexpected<Error>(
112+
{.kind = ErrorKind::kInvalidArgument,
113+
.message = std::format("Unsupported or invalid transform type: {}",
114+
iceberg::ToString(spec.transform_type))});
115+
}
73116
}
74117

75118
} // namespace iceberg

src/iceberg/transform.h

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,42 @@ enum class TransformType {
5656
kVoid,
5757
};
5858

59+
/// \brief Get the relative transform name
60+
constexpr std::string_view ToString(TransformType type) {
61+
switch (type) {
62+
case TransformType::kUnknown:
63+
return "unknown";
64+
case TransformType::kIdentity:
65+
return "identity";
66+
case TransformType::kBucket:
67+
return "bucket";
68+
case TransformType::kTruncate:
69+
return "truncate";
70+
case TransformType::kYear:
71+
return "year";
72+
case TransformType::kMonth:
73+
return "month";
74+
case TransformType::kDay:
75+
return "day";
76+
case TransformType::kHour:
77+
return "hour";
78+
case TransformType::kVoid:
79+
return "void";
80+
}
81+
}
82+
5983
/// \brief A transform function used for partitioning.
6084
class ICEBERG_EXPORT TransformFunction : public util::Formattable {
6185
public:
62-
explicit TransformFunction(TransformType type);
86+
TransformFunction(TransformType transform_type, std::shared_ptr<Type> source_type);
6387
/// \brief Transform an input array to a new array
6488
virtual expected<ArrowArray, Error> Transform(const ArrowArray& data) = 0;
6589
/// \brief Get the transform type
66-
virtual TransformType transform_type() const;
90+
TransformType transform_type() const;
91+
/// \brief Get the source type of transform function
92+
std::shared_ptr<Type> const& source_type() const;
93+
/// \brief Get the result type of transform function
94+
virtual expected<std::shared_ptr<Type>, Error> ResultType() const = 0;
6795

6896
std::string ToString() const override;
6997

@@ -75,18 +103,28 @@ class ICEBERG_EXPORT TransformFunction : public util::Formattable {
75103
return !(lhs == rhs);
76104
}
77105

106+
/// \brief Make a TransformFunction instance based on the given TransformSpec.
107+
///
108+
/// This method examines the transform type and associated parameters within the
109+
/// provided TransformSpec, and returns a corresponding implementation of
110+
/// TransformFunction.
111+
///
112+
/// The function may fail if the specified transform type is not recognized or
113+
/// supported.
114+
///
115+
/// \param spec The TransformSpec that contains the transform type and the associated
116+
/// parameters.
117+
// \return An expected result containing a unique pointer to the
118+
/// corresponding TransformFunction implementation, or an error if the creation fails.
119+
static expected<std::unique_ptr<TransformFunction>, Error> Make(
120+
const TransformSpec& spec);
121+
78122
private:
79123
/// \brief Compare two partition specs for equality.
80124
[[nodiscard]] virtual bool Equals(const TransformFunction& other) const;
81125

82126
TransformType transform_type_;
83-
};
84-
85-
class IdentityTransformFunction : public TransformFunction {
86-
public:
87-
IdentityTransformFunction();
88-
/// \brief Transform will take an input array and transform it into a new array.
89-
expected<ArrowArray, Error> Transform(const ArrowArray& input) override;
127+
std::shared_ptr<Type> source_type_;
90128
};
91129

92130
} // namespace iceberg

0 commit comments

Comments
 (0)