Skip to content

Commit 96e3a1a

Browse files
committed
feat: add plugins and testing
1 parent 2359848 commit 96e3a1a

File tree

200 files changed

+29447
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

200 files changed

+29447
-0
lines changed

src/paimon/CMakeLists.txt

Lines changed: 555 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2024-present Alibaba Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
if(PAIMON_ENABLE_AVRO)
16+
17+
set(PAIMON_AVRO_FILE_FORMAT
18+
avro_adaptor.cpp
19+
avro_array_data_getter.cpp
20+
avro_file_batch_reader.cpp
21+
avro_file_format.cpp
22+
avro_file_format_factory.cpp
23+
avro_format_writer.cpp
24+
avro_input_stream_impl.cpp
25+
avro_output_stream_impl.cpp
26+
avro_record_converter.cpp
27+
avro_record_data_getter.cpp
28+
avro_schema_converter.cpp)
29+
30+
add_paimon_lib(paimon_avro_file_format
31+
SOURCES
32+
${PAIMON_AVRO_FILE_FORMAT}
33+
EXTRA_INCLUDES
34+
${AVRO_INCLUDE_DIR}
35+
DEPENDENCIES
36+
paimon_shared
37+
avro
38+
STATIC_LINK_LIBS
39+
arrow
40+
glog
41+
fmt
42+
avro
43+
tbb
44+
dl
45+
Threads::Threads
46+
SHARED_LINK_LIBS
47+
paimon_shared
48+
SHARED_LINK_FLAGS
49+
${PAIMON_VERSION_SCRIPT_FLAGS})
50+
51+
if(PAIMON_BUILD_TESTS)
52+
add_paimon_test(avro_format_test
53+
SOURCES
54+
avro_adaptor_test.cpp
55+
avro_file_batch_reader_test.cpp
56+
avro_file_format_test.cpp
57+
avro_input_stream_impl_test.cpp
58+
avro_record_converter_test.cpp
59+
avro_schema_converter_test.cpp
60+
avro_writer_builder_test.cpp
61+
avro_array_data_getter_test.cpp
62+
EXTRA_INCLUDES
63+
${AVRO_INCLUDE_DIR}
64+
STATIC_LINK_LIBS
65+
paimon_shared
66+
test_utils_static
67+
"-Wl,--whole-archive"
68+
paimon_local_file_system_static
69+
paimon_avro_file_format_static
70+
"-Wl,--no-whole-archive"
71+
${GTEST_LINK_TOOLCHAIN})
72+
73+
endif()
74+
75+
endif()
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2024-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/format/avro/avro_adaptor.h"
18+
19+
#include <cstddef>
20+
#include <utility>
21+
22+
#include "arrow/api.h"
23+
#include "arrow/array/array_base.h"
24+
#include "arrow/array/array_binary.h"
25+
#include "arrow/array/array_nested.h"
26+
#include "arrow/array/array_primitive.h"
27+
#include "arrow/util/checked_cast.h"
28+
#include "avro/GenericDatum.hh"
29+
#include "avro/Node.hh"
30+
#include "avro/ValidSchema.hh"
31+
#include "paimon/status.h"
32+
33+
namespace paimon::avro {
34+
35+
Result<std::vector<::avro::GenericDatum>> AvroAdaptor::ConvertArrayToGenericDatums(
36+
const std::shared_ptr<arrow::Array>& array, const ::avro::ValidSchema& avro_schema) const {
37+
std::vector<::avro::GenericDatum> datums;
38+
for (int32_t i = 0; i < array->length(); i++) {
39+
PAIMON_ASSIGN_OR_RAISE(::avro::GenericDatum datum,
40+
ConvertArrayToGenericDatum(avro_schema, array, i));
41+
datums.push_back(datum);
42+
}
43+
return datums;
44+
}
45+
46+
Result<::avro::GenericDatum> AvroAdaptor::ConvertArrayToGenericDatum(
47+
const ::avro::ValidSchema& avro_schema, const std::shared_ptr<arrow::Array>& arrow_array,
48+
int32_t row_idx) const {
49+
std::shared_ptr<arrow::StructArray> struct_array =
50+
arrow::internal::checked_pointer_cast<arrow::StructArray>(arrow_array);
51+
if (struct_array == nullptr) {
52+
return Status::Invalid("arrow array should be struct array");
53+
}
54+
::avro::GenericDatum datum(avro_schema.root());
55+
auto& record = datum.value<::avro::GenericRecord>();
56+
const auto& node = avro_schema.root();
57+
const auto& fields = type_->fields();
58+
for (size_t i = 0; i < fields.size(); i++) {
59+
std::shared_ptr<arrow::Array> field_array = struct_array->field(i);
60+
switch (fields[i]->type()->id()) {
61+
case arrow::Type::type::BOOL: {
62+
auto array =
63+
arrow::internal::checked_pointer_cast<arrow::BooleanArray>(field_array);
64+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
65+
SetValue<bool, arrow::BooleanArray>(row_idx, array, &datum);
66+
record.setFieldAt(i, datum);
67+
break;
68+
}
69+
case arrow::Type::type::INT8: {
70+
auto array = arrow::internal::checked_pointer_cast<arrow::Int8Array>(field_array);
71+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
72+
// avro only support int32_t and int64_t
73+
SetValue<int32_t, arrow::Int8Array>(row_idx, array, &datum);
74+
record.setFieldAt(i, datum);
75+
break;
76+
}
77+
case arrow::Type::type::INT16: {
78+
auto array = arrow::internal::checked_pointer_cast<arrow::Int16Array>(field_array);
79+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
80+
// avro only support int32_t and int64_t
81+
SetValue<int32_t, arrow::Int16Array>(row_idx, array, &datum);
82+
record.setFieldAt(i, datum);
83+
break;
84+
}
85+
case arrow::Type::type::INT32: {
86+
auto array = arrow::internal::checked_pointer_cast<arrow::Int32Array>(field_array);
87+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
88+
SetValue<int32_t, arrow::Int32Array>(row_idx, array, &datum);
89+
record.setFieldAt(i, datum);
90+
break;
91+
}
92+
case arrow::Type::type::INT64: {
93+
auto array = arrow::internal::checked_pointer_cast<arrow::Int64Array>(field_array);
94+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
95+
SetValue<int64_t, arrow::Int64Array>(row_idx, array, &datum);
96+
record.setFieldAt(i, datum);
97+
break;
98+
}
99+
case arrow::Type::type::FLOAT: {
100+
auto array = arrow::internal::checked_pointer_cast<arrow::FloatArray>(field_array);
101+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
102+
SetValue<float, arrow::FloatArray>(row_idx, array, &datum);
103+
record.setFieldAt(i, datum);
104+
break;
105+
}
106+
case arrow::Type::type::DOUBLE: {
107+
auto array = arrow::internal::checked_pointer_cast<arrow::DoubleArray>(field_array);
108+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
109+
SetValue<double, arrow::DoubleArray>(row_idx, array, &datum);
110+
record.setFieldAt(i, datum);
111+
break;
112+
}
113+
case arrow::Type::type::STRING: {
114+
auto array = arrow::internal::checked_pointer_cast<arrow::StringArray>(field_array);
115+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
116+
SetValue<std::string, arrow::StringArray>(row_idx, array, &datum);
117+
record.setFieldAt(i, datum);
118+
break;
119+
}
120+
case arrow::Type::type::BINARY: {
121+
auto array = arrow::internal::checked_pointer_cast<arrow::BinaryArray>(field_array);
122+
auto datum = ::avro::GenericDatum(avro_schema.root()->leafAt(i));
123+
SetValue<std::vector<uint8_t>, arrow::BinaryArray>(row_idx, array, &datum);
124+
record.setFieldAt(i, datum);
125+
break;
126+
}
127+
case arrow::Type::type::STRUCT: {
128+
::avro::ValidSchema leaf_schema(node->leafAt(i));
129+
PAIMON_ASSIGN_OR_RAISE(
130+
::avro::GenericDatum datum,
131+
ConvertArrayToGenericDatum(leaf_schema, field_array, row_idx));
132+
record.setFieldAt(i, datum);
133+
break;
134+
}
135+
default: {
136+
return Status::Invalid("unsupported type");
137+
}
138+
}
139+
}
140+
return datum;
141+
}
142+
143+
} // namespace paimon::avro
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2024-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
#include <memory>
21+
#include <string>
22+
#include <string_view>
23+
#include <vector>
24+
25+
#include "arrow/api.h"
26+
#include "arrow/array/array_binary.h"
27+
#include "avro/GenericDatum.hh"
28+
#include "avro/ValidSchema.hh"
29+
#include "paimon/result.h"
30+
31+
namespace arrow {
32+
class Array;
33+
class DataType;
34+
} // namespace arrow
35+
namespace avro {
36+
class ValidSchema;
37+
} // namespace avro
38+
39+
namespace paimon::avro {
40+
41+
class AvroAdaptor {
42+
public:
43+
explicit AvroAdaptor(const std::shared_ptr<arrow::DataType>& type) : type_(type) {}
44+
45+
Result<std::vector<::avro::GenericDatum>> ConvertArrayToGenericDatums(
46+
const std::shared_ptr<arrow::Array>& array, const ::avro::ValidSchema& avro_schema) const;
47+
48+
private:
49+
Result<::avro::GenericDatum> ConvertArrayToGenericDatum(
50+
const ::avro::ValidSchema& avro_schema, const std::shared_ptr<arrow::Array>& arrow_array,
51+
int32_t row_idx) const;
52+
53+
template <typename T, typename A>
54+
static void SetValue(int32_t row_idx, const std::shared_ptr<A>& array,
55+
::avro::GenericDatum* datum);
56+
57+
std::shared_ptr<arrow::DataType> type_;
58+
};
59+
60+
template <typename T, typename A>
61+
void AvroAdaptor::SetValue(int32_t row_idx, const std::shared_ptr<A>& array,
62+
::avro::GenericDatum* datum) {
63+
if (datum->isUnion()) {
64+
if (!array->IsNull(row_idx)) {
65+
datum->selectBranch(1);
66+
datum->value<T>() = array->Value(row_idx);
67+
}
68+
} else {
69+
datum->value<T>() = array->Value(row_idx);
70+
}
71+
}
72+
73+
template <>
74+
inline void AvroAdaptor::SetValue<std::string, arrow::StringArray>(
75+
int32_t row_idx, const std::shared_ptr<arrow::StringArray>& array,
76+
::avro::GenericDatum* datum) {
77+
if (datum->isUnion()) {
78+
if (!array->IsNull(row_idx)) {
79+
datum->selectBranch(1);
80+
datum->value<std::string>() = array->GetString(row_idx);
81+
}
82+
} else {
83+
datum->value<std::string>() = array->GetString(row_idx);
84+
}
85+
}
86+
87+
template <>
88+
inline void AvroAdaptor::SetValue<std::vector<uint8_t>, arrow::BinaryArray>(
89+
int32_t row_idx, const std::shared_ptr<arrow::BinaryArray>& array,
90+
::avro::GenericDatum* datum) {
91+
if (datum->isUnion()) {
92+
if (!array->IsNull(row_idx)) {
93+
datum->selectBranch(1);
94+
std::string_view view = array->Value(row_idx);
95+
datum->value<std::vector<uint8_t>>() = std::vector<uint8_t>(view.begin(), view.end());
96+
}
97+
} else {
98+
std::string_view view = array->Value(row_idx);
99+
datum->value<std::vector<uint8_t>>() = std::vector<uint8_t>(view.begin(), view.end());
100+
}
101+
}
102+
103+
} // namespace paimon::avro

0 commit comments

Comments
 (0)