Skip to content

Commit 6a6011b

Browse files
author
xiao.dong
committed
feat: support avro writer
1 add avro writer and factory(without write func since converter pull/166 not finished yet) 2 add file metrics definition
1 parent 7063f2b commit 6a6011b

File tree

6 files changed

+324
-1
lines changed

6 files changed

+324
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(ICEBERG_SOURCES
2323
expression/expression.cc
2424
expression/literal.cc
2525
file_reader.cc
26+
file_writer.cc
2627
json_internal.cc
2728
manifest_entry.cc
2829
manifest_list.cc
@@ -107,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE)
107108
arrow/arrow_fs_file_io.cc
108109
avro/avro_data_util.cc
109110
avro/avro_reader.cc
111+
avro/avro_writer.cc
110112
avro/avro_schema_util.cc
111113
avro/avro_register.cc
112114
avro/avro_stream_internal.cc

src/iceberg/avro/avro_writer.cc

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_writer.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/array/builder_base.h>
25+
#include <arrow/c/bridge.h>
26+
#include <arrow/record_batch.h>
27+
#include <arrow/result.h>
28+
#include <avro/DataFile.hh>
29+
#include <avro/GenericDatum.hh>
30+
#include <avro/NodeImpl.hh>
31+
32+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
33+
#include "iceberg/avro/avro_schema_util_internal.h"
34+
#include "iceberg/avro/avro_stream_internal.h"
35+
#include "iceberg/schema.h"
36+
#include "iceberg/util/checked_cast.h"
37+
#include "iceberg/util/macros.h"
38+
39+
namespace iceberg::avro {
40+
41+
namespace {
42+
43+
Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions& options,
44+
int64_t buffer_size) {
45+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
46+
auto result = io->fs()->OpenOutputStream(options.path);
47+
if (!result.ok()) {
48+
return IOError("Failed to open file {} for {}", options.path,
49+
result.status().message());
50+
}
51+
return std::make_unique<AvroOutputStream>(result.MoveValueUnsafe(), buffer_size);
52+
}
53+
54+
} // namespace
55+
56+
// A stateful context to keep track of the writing progress.
57+
struct WriteContext {};
58+
59+
class AvroWriter::Impl {
60+
public:
61+
Status Open(const WriterOptions& options) {
62+
write_schema_ = options.schema;
63+
64+
::avro::NodePtr root;
65+
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
66+
67+
avro_schema_ = std::make_shared<::avro::ValidSchema>(root);
68+
69+
// Open the output stream and adapt to the avro interface.
70+
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
71+
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
72+
CreateOutputStream(options, kDefaultBufferSize));
73+
74+
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
75+
std::move(output_stream), *avro_schema_);
76+
return {};
77+
}
78+
79+
Status Write(ArrowArray /*data*/) {
80+
// TODO(xiao.dong) convert data and write to avro
81+
// total_bytes_+= written_bytes;
82+
return {};
83+
}
84+
85+
Status Close() {
86+
if (writer_ != nullptr) {
87+
writer_->close();
88+
writer_.reset();
89+
}
90+
return {};
91+
}
92+
93+
bool Closed() const { return writer_ == nullptr; }
94+
95+
int64_t length() { return total_bytes_; }
96+
97+
private:
98+
int64_t total_bytes_ = 0;
99+
// The schema to write.
100+
std::shared_ptr<::iceberg::Schema> write_schema_;
101+
// The avro schema to write.
102+
std::shared_ptr<::avro::ValidSchema> avro_schema_;
103+
// The avro writer to write the data into a datum.
104+
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
105+
};
106+
107+
AvroWriter::~AvroWriter() = default;
108+
109+
Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); }
110+
111+
Status AvroWriter::Open(const WriterOptions& options) {
112+
impl_ = std::make_unique<Impl>();
113+
return impl_->Open(options);
114+
}
115+
116+
Status AvroWriter::Close() {
117+
if (!impl_->Closed()) {
118+
return impl_->Close();
119+
}
120+
return {};
121+
}
122+
123+
Metrics AvroWriter::metrics() {
124+
if (impl_->Closed()) {
125+
// TODO(xiao.dong) implement metrics
126+
return {};
127+
}
128+
return {};
129+
}
130+
131+
int64_t AvroWriter::length() {
132+
if (impl_->Closed()) {
133+
return impl_->length();
134+
}
135+
return 0;
136+
}
137+
138+
std::vector<int64_t> AvroWriter::split_offsets() { return {}; }
139+
140+
void AvroWriter::Register() {
141+
static WriterFactoryRegistry avro_writer_register(
142+
FileFormatType::kAvro,
143+
[]() -> Result<std::unique_ptr<Writer>> { return std::make_unique<AvroWriter>(); });
144+
}
145+
146+
} // namespace iceberg::avro

src/iceberg/avro/avro_writer.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::avro {
26+
27+
/// \brief A writer ArrowArray to Avro files.
28+
class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
29+
public:
30+
AvroWriter() = default;
31+
32+
~AvroWriter() override;
33+
34+
Status Open(const WriterOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Status Write(ArrowArray data) final;
39+
40+
Metrics metrics() final;
41+
42+
int64_t length() final;
43+
44+
std::vector<int64_t> split_offsets() final;
45+
46+
/// \brief Register this Avro writer implementation.
47+
static void Register();
48+
49+
private:
50+
class Impl;
51+
std::unique_ptr<Impl> impl_;
52+
};
53+
54+
} // namespace iceberg::avro

src/iceberg/file_writer.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/file_writer.h"
21+
22+
#include <unordered_map>
23+
24+
#include "iceberg/result.h"
25+
#include "iceberg/util/formatter.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
namespace {
31+
32+
WriterFactory GetNotImplementedFactory(FileFormatType format_type) {
33+
return [format_type]() -> Result<std::unique_ptr<Writer>> {
34+
return NotImplemented("Missing writer factory for file format: {}", format_type);
35+
};
36+
}
37+
38+
} // namespace
39+
40+
WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) {
41+
static std::unordered_map<FileFormatType, WriterFactory> factories = {
42+
{FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)},
43+
{FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)},
44+
{FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)},
45+
{FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)},
46+
};
47+
return factories.at(format_type);
48+
}
49+
50+
WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type,
51+
WriterFactory factory) {
52+
GetFactory(format_type) = std::move(factory);
53+
}
54+
55+
Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
56+
FileFormatType format_type, const WriterOptions& options) {
57+
ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)());
58+
ICEBERG_RETURN_UNEXPECTED(writer->Open(options));
59+
return writer;
60+
}
61+
62+
} // namespace iceberg

src/iceberg/file_writer.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828

2929
#include "iceberg/arrow_c_data.h"
3030
#include "iceberg/file_format.h"
31+
#include "iceberg/metrics.h"
3132
#include "iceberg/result.h"
33+
#include "iceberg/schema.h"
3234
#include "iceberg/type_fwd.h"
3335

3436
namespace iceberg {
@@ -38,7 +40,7 @@ struct ICEBERG_EXPORT WriterOptions {
3840
/// \brief The path to the file to write.
3941
std::string path;
4042
/// \brief The schema of the data to write.
41-
ArrowSchema schema;
43+
std::shared_ptr<Schema> schema;
4244
/// \brief FileIO instance to open the file. Writer implementations should down cast it
4345
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
4446
/// `ArrowFileSystemFileIO` as the default implementation.
@@ -65,6 +67,20 @@ class ICEBERG_EXPORT Writer {
6567
///
6668
/// \return Status of write results.
6769
virtual Status Write(ArrowArray data) = 0;
70+
71+
/// \brief Get the file statistics.
72+
/// Only valid after the file is closed.
73+
virtual Metrics metrics() = 0;
74+
75+
/// \brief Get the file length.
76+
/// Only valid after the file is closed.
77+
virtual int64_t length() = 0;
78+
79+
/// \brief Returns a list of recommended split locations, if applicable, empty
80+
/// otherwise. When available, this information is used for planning scan tasks whose
81+
/// boundaries are determined by these offsets. The returned list must be sorted in
82+
/// ascending order. Only valid after the file is closed.
83+
virtual std::vector<int64_t> split_offsets() = 0;
6884
};
6985

7086
/// \brief Factory function to create a writer of a specific file format.

src/iceberg/metrics.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/metrics.h
23+
/// Iceberg file format metrics
24+
25+
#include <unordered_map>
26+
27+
#include "iceberg/expression/literal.h"
28+
#include "iceberg/iceberg_export.h"
29+
30+
namespace iceberg {
31+
32+
/// \brief Iceberg file format metrics
33+
struct ICEBERG_EXPORT Metrics {
34+
int64_t row_count_ = 0;
35+
std::unordered_map<int64_t, int64_t> column_sizes;
36+
std::unordered_map<int64_t, int64_t> value_counts;
37+
std::unordered_map<int64_t, int64_t> null_value_counts;
38+
std::unordered_map<int64_t, int64_t> nan_value_counts;
39+
std::unordered_map<int64_t, Literal> lower_bounds;
40+
std::unordered_map<int64_t, Literal> upper_bounds;
41+
};
42+
43+
} // namespace iceberg

0 commit comments

Comments
 (0)