Skip to content

Commit bcd4066

Browse files
author
xiao.dong
committed
feat: support manifest&list writer part 1
1 add avro writer and factory(without write func since converter pull/166 not finished yet) 2 add manifest and manifest list writer internal implement definition
1 parent 8ecee31 commit bcd4066

File tree

7 files changed

+402
-0
lines changed

7 files changed

+402
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(ICEBERG_SOURCES
2323
expression/expression.cc
2424
expression/literal.cc
2525
file_reader.cc
26+
file_writer.cc
2627
json_internal.cc
2728
manifest_entry.cc
2829
manifest_list.cc
@@ -46,6 +47,8 @@ set(ICEBERG_SOURCES
4647
type.cc
4748
manifest_reader.cc
4849
manifest_reader_internal.cc
50+
manifest_writer.cc
51+
manifest_writer_internal.cc
4952
arrow_c_data_guard_internal.cc
5053
util/murmurhash3_internal.cc
5154
util/timepoint.cc
@@ -107,6 +110,7 @@ if(ICEBERG_BUILD_BUNDLE)
107110
arrow/arrow_fs_file_io.cc
108111
avro/avro_data_util.cc
109112
avro/avro_reader.cc
113+
avro/avro_writer.cc
110114
avro/avro_schema_util.cc
111115
avro/avro_register.cc
112116
avro/avro_stream_internal.cc

src/iceberg/avro/avro_writer.cc

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_writer.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/array/builder_base.h>
25+
#include <arrow/c/bridge.h>
26+
#include <arrow/record_batch.h>
27+
#include <arrow/result.h>
28+
#include <avro/DataFile.hh>
29+
#include <avro/GenericDatum.hh>
30+
#include <avro/NodeImpl.hh>
31+
32+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
33+
#include "iceberg/avro/avro_schema_util_internal.h"
34+
#include "iceberg/avro/avro_stream_internal.h"
35+
#include "iceberg/schema.h"
36+
#include "iceberg/schema_internal.h"
37+
#include "iceberg/util/checked_cast.h"
38+
#include "iceberg/util/macros.h"
39+
40+
namespace iceberg::avro {
41+
42+
namespace {
43+
44+
Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions& options,
45+
int64_t buffer_size) {
46+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
47+
auto result = io->fs()->OpenOutputStream(options.path);
48+
if (!result.ok()) {
49+
return IOError("Failed to open file {} for {}", options.path,
50+
result.status().message());
51+
}
52+
return std::make_unique<AvroOutputStream>(result.MoveValueUnsafe(), buffer_size);
53+
}
54+
55+
} // namespace
56+
57+
// A stateful context to keep track of the writing progress.
58+
struct WriteContext {};
59+
60+
class AvroWriter::Impl {
61+
public:
62+
Status Open(const WriterOptions& options) {
63+
write_arrow_schema_ = options.schema;
64+
ICEBERG_ASSIGN_OR_RAISE(write_schema_, FromArrowSchema(options.schema, std::nullopt));
65+
66+
auto root = std::make_shared<::avro::NodeRecord>();
67+
ToAvroNodeVisitor visitor;
68+
for (const auto& field : write_schema_->fields()) {
69+
::avro::NodePtr node;
70+
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node));
71+
root->addLeaf(node);
72+
}
73+
avro_schema_ = std::make_shared<::avro::ValidSchema>(root);
74+
75+
// Open the output stream and adapt to the avro interface.
76+
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
77+
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
78+
CreateOutputStream(options, kDefaultBufferSize));
79+
80+
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
81+
std::move(output_stream), *avro_schema_);
82+
return {};
83+
}
84+
85+
Status Write(ArrowArray /*data*/) {
86+
if (!context_) {
87+
ICEBERG_RETURN_UNEXPECTED(InitWriteContext());
88+
}
89+
// TODO(xiao.dong) convert data and write to avro
90+
return {};
91+
}
92+
93+
Status Close() {
94+
if (writer_ != nullptr) {
95+
writer_->close();
96+
writer_.reset();
97+
}
98+
context_.reset();
99+
return {};
100+
}
101+
102+
private:
103+
Status InitWriteContext() { return {}; }
104+
105+
private:
106+
ArrowSchema write_arrow_schema_;
107+
// The schema to write.
108+
std::shared_ptr<::iceberg::Schema> write_schema_;
109+
// The avro schema to write.
110+
std::shared_ptr<::avro::ValidSchema> avro_schema_;
111+
// The avro writer to write the data into a datum.
112+
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
113+
// The context to keep track of the writing progress.
114+
std::unique_ptr<WriteContext> context_;
115+
};
116+
117+
AvroWriter::~AvroWriter() = default;
118+
119+
Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); }
120+
121+
Status AvroWriter::Open(const WriterOptions& options) {
122+
impl_ = std::make_unique<Impl>();
123+
return impl_->Open(options);
124+
}
125+
126+
Status AvroWriter::Close() { return impl_->Close(); }
127+
128+
void AvroWriter::Register() {
129+
static WriterFactoryRegistry avro_writer_register(
130+
FileFormatType::kAvro,
131+
[]() -> Result<std::unique_ptr<Writer>> { return std::make_unique<AvroWriter>(); });
132+
}
133+
134+
} // namespace iceberg::avro

src/iceberg/avro/avro_writer.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::avro {
26+
27+
/// \brief A writer ArrowArray to Avro files.
28+
class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
29+
public:
30+
AvroWriter() = default;
31+
32+
~AvroWriter() override;
33+
34+
Status Open(const WriterOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Status Write(ArrowArray data) final;
39+
40+
/// \brief Register this Avro writer implementation.
41+
static void Register();
42+
43+
private:
44+
class Impl;
45+
std::unique_ptr<Impl> impl_;
46+
};
47+
48+
} // namespace iceberg::avro

src/iceberg/file_writer.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/file_writer.h"
21+
22+
#include <unordered_map>
23+
24+
#include "iceberg/result.h"
25+
#include "iceberg/util/formatter.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
namespace {
31+
32+
WriterFactory GetNotImplementedFactory(FileFormatType format_type) {
33+
return [format_type]() -> Result<std::unique_ptr<Writer>> {
34+
return NotImplemented("Missing writer factory for file format: {}", format_type);
35+
};
36+
}
37+
38+
} // namespace
39+
40+
WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) {
41+
static std::unordered_map<FileFormatType, WriterFactory> factories = {
42+
{FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)},
43+
{FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)},
44+
{FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)},
45+
{FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)},
46+
};
47+
return factories.at(format_type);
48+
}
49+
50+
WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type,
51+
WriterFactory factory) {
52+
GetFactory(format_type) = std::move(factory);
53+
}
54+
55+
Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
56+
FileFormatType format_type, const WriterOptions& options) {
57+
ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)());
58+
ICEBERG_RETURN_UNEXPECTED(writer->Open(options));
59+
return writer;
60+
}
61+
62+
} // namespace iceberg

src/iceberg/manifest_writer.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest_writer.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/manifest_writer_internal.h"
25+
#include "iceberg/schema.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
31+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
32+
std::shared_ptr<Schema> partition_schema) {
33+
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
34+
auto fields_span = manifest_entry_schema->fields();
35+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
36+
auto schema = std::make_shared<Schema>(fields);
37+
ICEBERG_ASSIGN_OR_RAISE(
38+
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
39+
{.path = std::string(manifest_location),
40+
.io = std::move(file_io)}));
41+
return std::make_unique<ManifestWriterImpl>(std::move(writer), std::move(schema));
42+
}
43+
44+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
45+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
46+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
47+
ManifestFile::Type().fields().end());
48+
auto schema = std::make_shared<Schema>(fields);
49+
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
50+
FileFormatType::kAvro,
51+
{.path = std::string(manifest_list_location),
52+
.io = std::move(file_io)}));
53+
return std::make_unique<ManifestListWriterImpl>(std::move(writer), std::move(schema));
54+
}
55+
56+
} // namespace iceberg
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "manifest_writer_internal.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/schema.h"
25+
#include "iceberg/type.h"
26+
27+
namespace iceberg {
28+
29+
Status ManifestWriterImpl::WriteManifestEntries(
30+
const std::vector<ManifestEntry>& /*entries*/) const {
31+
// TODO(xiao.dong) convert entries to arrow data
32+
return {};
33+
}
34+
35+
Status ManifestListWriterImpl::WriteManifestFiles(
36+
const std::vector<ManifestFile>& /*files*/) const {
37+
// TODO(xiao.dong) convert manifest files to arrow data
38+
return {};
39+
}
40+
41+
} // namespace iceberg

0 commit comments

Comments
 (0)