Skip to content

Commit 67a266d

Browse files
committed
feat: transform function
1 parent d05a9b2 commit 67a266d

14 files changed

+887
-70
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ set(ICEBERG_SOURCES
2626
partition_field.cc
2727
partition_spec.cc
2828
transform.cc
29+
transform/transform_factory.cc
30+
transform/transform_function.cc
31+
transform/transform_spec.cc
2932
type.cc)
3033

3134
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/arrow_c_data.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,44 @@ struct ArrowArray {
7373

7474
#endif // ARROW_C_DATA_INTERFACE
7575

76+
#ifndef ARROW_C_STREAM_INTERFACE
77+
# define ARROW_C_STREAM_INTERFACE
78+
79+
struct ArrowArrayStream {
80+
// Callback to get the stream type
81+
// (will be the same for all arrays in the stream).
82+
//
83+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
84+
//
85+
// If successful, the ArrowSchema must be released independently from the stream.
86+
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
87+
88+
// Callback to get the next array
89+
// (if no error and the array is released, the stream has ended)
90+
//
91+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
92+
//
93+
// If successful, the ArrowArray must be released independently from the stream.
94+
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
95+
96+
// Callback to get optional detailed error information.
97+
// This must only be called if the last stream operation failed
98+
// with a non-0 return code.
99+
//
100+
// Return value: pointer to a null-terminated character array describing
101+
// the last error, or NULL if no description is available.
102+
//
103+
// The returned pointer is only valid until the next operation on this stream
104+
// (including release).
105+
const char* (*get_last_error)(struct ArrowArrayStream*);
106+
107+
// Release callback: release the stream's own resources.
108+
// Note that arrays returned by `get_next` must be individually released.
109+
void (*release)(struct ArrowArrayStream*);
110+
// Opaque producer-specific data
111+
void* private_data;
112+
};
113+
114+
#endif // ARROW_C_STREAM_INTERFACE
115+
76116
} // extern "C"

src/iceberg/transform.cc

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,55 +21,24 @@
2121

2222
#include <format>
2323

24-
namespace iceberg {
24+
#include "iceberg/type.h"
2525

26-
namespace {
27-
/// \brief Get the relative transform name
28-
constexpr std::string_view ToString(TransformType type) {
29-
switch (type) {
30-
case TransformType::kUnknown:
31-
return "unknown";
32-
case TransformType::kIdentity:
33-
return "identity";
34-
case TransformType::kBucket:
35-
return "bucket";
36-
case TransformType::kTruncate:
37-
return "truncate";
38-
case TransformType::kYear:
39-
return "year";
40-
case TransformType::kMonth:
41-
return "month";
42-
case TransformType::kDay:
43-
return "day";
44-
case TransformType::kHour:
45-
return "hour";
46-
case TransformType::kVoid:
47-
return "void";
48-
default:
49-
return "invalid";
50-
}
51-
}
52-
} // namespace
26+
namespace iceberg {
5327

54-
TransformFunction::TransformFunction(TransformType type) : transform_type_(type) {}
28+
TransformFunction::TransformFunction(TransformType transform_type,
29+
std::shared_ptr<Type> source_type)
30+
: transform_type_(transform_type), source_type_(std::move(source_type)) {}
5531

5632
TransformType TransformFunction::transform_type() const { return transform_type_; }
5733

34+
std::shared_ptr<Type> TransformFunction::source_type() const { return source_type_; }
35+
5836
std::string TransformFunction::ToString() const {
5937
return std::format("{}", iceberg::ToString(transform_type_));
6038
}
6139

6240
bool TransformFunction::Equals(const TransformFunction& other) const {
63-
return transform_type_ == other.transform_type_;
64-
}
65-
66-
IdentityTransformFunction::IdentityTransformFunction()
67-
: TransformFunction(TransformType::kIdentity) {}
68-
69-
expected<ArrowArray, Error> IdentityTransformFunction::Transform(
70-
const ArrowArray& input) {
71-
return unexpected<Error>({.kind = ErrorKind::kNotSupported,
72-
.message = "IdentityTransformFunction::Transform"});
41+
return transform_type_ == other.transform_type_ && *source_type_ == *other.source_type_;
7342
}
7443

7544
} // namespace iceberg

src/iceberg/transform.h

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,45 @@ enum class TransformType {
5656
kVoid,
5757
};
5858

59+
/// \brief Get the relative transform name
60+
constexpr std::string_view ToString(TransformType type) {
61+
switch (type) {
62+
case TransformType::kUnknown:
63+
return "unknown";
64+
case TransformType::kIdentity:
65+
return "identity";
66+
case TransformType::kBucket:
67+
return "bucket";
68+
case TransformType::kTruncate:
69+
return "truncate";
70+
case TransformType::kYear:
71+
return "year";
72+
case TransformType::kMonth:
73+
return "month";
74+
case TransformType::kDay:
75+
return "day";
76+
case TransformType::kHour:
77+
return "hour";
78+
case TransformType::kVoid:
79+
return "void";
80+
default:
81+
return "invalid";
82+
}
83+
}
84+
5985
/// \brief A transform function used for partitioning.
6086
class ICEBERG_EXPORT TransformFunction : public util::Formattable {
6187
public:
62-
explicit TransformFunction(TransformType type);
88+
explicit TransformFunction(TransformType transform_type,
89+
std::shared_ptr<Type> source_type);
6390
/// \brief Transform an input array to a new array
6491
virtual expected<ArrowArray, Error> Transform(const ArrowArray& data) = 0;
6592
/// \brief Get the transform type
66-
virtual TransformType transform_type() const;
93+
TransformType transform_type() const;
94+
/// \brief Get the source type of transform function
95+
std::shared_ptr<Type> source_type() const;
96+
/// \brief Get the result type of transform function
97+
virtual expected<std::shared_ptr<Type>, Error> result_type() const = 0;
6798

6899
std::string ToString() const override;
69100

@@ -80,13 +111,7 @@ class ICEBERG_EXPORT TransformFunction : public util::Formattable {
80111
[[nodiscard]] virtual bool Equals(const TransformFunction& other) const;
81112

82113
TransformType transform_type_;
83-
};
84-
85-
class IdentityTransformFunction : public TransformFunction {
86-
public:
87-
IdentityTransformFunction();
88-
/// \brief Transform will take an input array and transform it into a new array.
89-
expected<ArrowArray, Error> Transform(const ArrowArray& input) override;
114+
std::shared_ptr<Type> source_type_;
90115
};
91116

92117
} // namespace iceberg
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/transform/transform_factory.h"
21+
22+
#include <format>
23+
24+
#include <nanoarrow/nanoarrow.hpp>
25+
26+
#include "iceberg/transform/transform_function.h"
27+
#include "iceberg/transform/transform_spec.h"
28+
29+
namespace iceberg {
30+
31+
namespace {
32+
33+
int32_t GetInt32FromParamArray(ArrowArray const& param_array) {
34+
ArrowArrayView view;
35+
ArrowArrayViewInitFromType(&view, NANOARROW_TYPE_INT32);
36+
NANOARROW_THROW_NOT_OK(ArrowArrayViewSetArray(&view, &param_array, nullptr));
37+
const auto value = view.buffer_views[1].data.as_int32[0];
38+
ArrowArrayViewReset(&view);
39+
return value;
40+
}
41+
42+
} // namespace
43+
44+
expected<std::unique_ptr<TransformFunction>, Error> TransformFactory::create(
45+
const TransformSpec& spec) {
46+
switch (spec.transform_type) {
47+
case TransformType::kIdentity:
48+
return std::make_unique<IdentityTransform>(spec.source_type);
49+
case TransformType::kBucket: {
50+
if (!spec.params_opt.has_value()) {
51+
return unexpected<Error>(
52+
{.kind = ErrorKind::kInvalidArgument,
53+
.message = "Bucket transform requires 1 parameter (number of buckets), but "
54+
"none were provided."});
55+
}
56+
if (spec.params_opt->length != 1) {
57+
return unexpected<Error>(
58+
{.kind = ErrorKind::kInvalidArgument,
59+
.message = std::format("Bucket transform expects exactly 1 parameter "
60+
"(number of buckets), but got {}.",
61+
spec.params_opt->length)});
62+
}
63+
auto num_buckets = GetInt32FromParamArray(spec.params_opt.value());
64+
return std::make_unique<BucketTransform>(spec.source_type, num_buckets);
65+
}
66+
case TransformType::kTruncate: {
67+
if (!spec.params_opt.has_value()) {
68+
return unexpected<Error>({.kind = ErrorKind::kInvalidArgument,
69+
.message = "Truncate transform requires 1 parameter "
70+
"(width), but none were provided."});
71+
}
72+
if (spec.params_opt->length != 1) {
73+
return unexpected<Error>(
74+
{.kind = ErrorKind::kInvalidArgument,
75+
.message = std::format(
76+
"Truncate transform expects exactly 1 parameter (width), but got {}.",
77+
spec.params_opt->length)});
78+
}
79+
auto width = GetInt32FromParamArray(spec.params_opt.value());
80+
return std::make_unique<TruncateTransform>(spec.source_type, width);
81+
}
82+
case TransformType::kYear:
83+
return std::make_unique<YearTransform>(spec.source_type);
84+
case TransformType::kMonth:
85+
return std::make_unique<MonthTransform>(spec.source_type);
86+
case TransformType::kDay:
87+
return std::make_unique<DayTransform>(spec.source_type);
88+
case TransformType::kHour:
89+
return std::make_unique<HourTransform>(spec.source_type);
90+
case TransformType::kVoid:
91+
return std::make_unique<VoidTransform>(spec.source_type);
92+
default:
93+
return unexpected<Error>(
94+
{.kind = ErrorKind::kInvalidArgument,
95+
.message = std::format("Unsupported or invalid transform type: {}",
96+
ToString(spec.transform_type))});
97+
}
98+
}
99+
100+
} // namespace iceberg
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/transform/transform_factory.h
23+
24+
#include <memory>
25+
26+
#include "iceberg/error.h"
27+
#include "iceberg/expected.h"
28+
#include "iceberg/type_fwd.h"
29+
30+
namespace iceberg {
31+
32+
/// \brief Factory class for creating TransformFunction instances from TransformSpec.
33+
///
34+
/// This class provides a static interface for instantiating specific transform
35+
/// implementations based on the transform type and associated parameters
36+
/// encapsulated within a TransformSpec.
37+
class TransformFactory {
38+
public:
39+
/// \brief Creates a TransformFunction instance based on the given TransformSpec.
40+
///
41+
/// This method examines the transform type and associated parameters within the
42+
/// provided TransformSpec, and returns a corresponding implementation of
43+
/// TransformFunction.
44+
///
45+
/// The function may fail if the specified transform type is not recognized or
46+
/// supported.
47+
///
48+
/// \param spec The TransformSpec that contains the transform type and the associated
49+
/// parameters.
50+
// \return An expected result containing a unique pointer to the
51+
/// corresponding TransformFunction implementation, or an error if the creation fails.
52+
static expected<std::unique_ptr<TransformFunction>, Error> create(
53+
const TransformSpec& spec);
54+
};
55+
56+
} // namespace iceberg

0 commit comments

Comments
 (0)