Skip to content

Commit 7458797

Browse files
committed
basic support parquet writer
1 parent 9f13bac commit 7458797

File tree

4 files changed

+207
-1
lines changed

4 files changed

+207
-1
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,6 @@ cmake-build-release/
2323

2424
# intellij files
2525
.idea
26+
27+
# vscode files
28+
.vscode

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ if(ICEBERG_BUILD_BUNDLE)
116116
parquet/parquet_data_util.cc
117117
parquet/parquet_reader.cc
118118
parquet/parquet_register.cc
119-
parquet/parquet_schema_util.cc)
119+
parquet/parquet_schema_util.cc
120+
parquet/parquet_writer.cc)
120121

121122
# Libraries to link with exported libiceberg_bundle.{so,a}.
122123
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/parquet/parquet_writer.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/c/bridge.h>
25+
#include <arrow/record_batch.h>
26+
#include <arrow/util/key_value_metadata.h>
27+
#include <parquet/arrow/schema.h>
28+
#include <parquet/arrow/writer.h>
29+
#include <parquet/file_writer.h>
30+
#include <parquet/properties.h>
31+
32+
#include "iceberg/arrow/arrow_error_transform_internal.h"
33+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
34+
#include "iceberg/schema_internal.h"
35+
#include "iceberg/util/checked_cast.h"
36+
#include "iceberg/util/macros.h"
37+
38+
namespace iceberg::parquet {
39+
40+
namespace {
41+
42+
Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
43+
const WriterOptions& options) {
44+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
45+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path));
46+
return output;
47+
}
48+
49+
} // namespace
50+
51+
class ParquetWriter::Impl {
52+
public:
53+
Status Open(const WriterOptions& options) {
54+
auto writer_properties =
55+
::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
56+
auto arrow_writer_properties = ::parquet::default_arrow_writer_properties();
57+
58+
ArrowSchema c_schema;
59+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
60+
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema));
61+
62+
std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
63+
ICEBERG_ARROW_RETURN_NOT_OK(
64+
::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties,
65+
*arrow_writer_properties, &schema_descriptor));
66+
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
67+
schema_descriptor->schema_root());
68+
69+
std::shared_ptr<::arrow::KeyValueMetadata> metadata =
70+
::arrow::key_value_metadata(options.properties);
71+
72+
ICEBERG_ASSIGN_OR_RAISE(auto output_stream, OpenOutputStream(options));
73+
auto file_writer = ::parquet::ParquetFileWriter::Open(
74+
std::move(output_stream), schema_node, writer_properties, metadata);
75+
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make(
76+
pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, &writer_));
77+
78+
return {};
79+
}
80+
81+
Status Write(ArrowArray array) {
82+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
83+
::arrow::ImportRecordBatch(&array, arrow_schema_));
84+
85+
ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
86+
87+
return {};
88+
}
89+
90+
// Close the writer and release resources
91+
Status Close() {
92+
if (writer_ != nullptr) {
93+
ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
94+
writer_.reset();
95+
}
96+
return {};
97+
}
98+
99+
bool Closed() const { return writer_ == nullptr; }
100+
101+
private:
102+
// TODO(gangwu): make memory pool configurable
103+
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
104+
// Schema to write from the Parquet file.
105+
std::shared_ptr<::arrow::Schema> arrow_schema_;
106+
// Parquet file writer to write ArrowArray.
107+
std::unique_ptr<::parquet::arrow::FileWriter> writer_;
108+
};
109+
110+
ParquetWriter::~ParquetWriter() = default;
111+
112+
Status ParquetWriter::Open(const WriterOptions& options) {
113+
impl_ = std::make_unique<Impl>();
114+
return impl_->Open(options);
115+
}
116+
117+
Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); }
118+
119+
Status ParquetWriter::Close() { return impl_->Close(); }
120+
121+
std::optional<Metrics> ParquetWriter::metrics() {
122+
if (!impl_->Closed()) {
123+
return std::nullopt;
124+
}
125+
return {};
126+
}
127+
128+
std::optional<int64_t> ParquetWriter::length() {
129+
if (!impl_->Closed()) {
130+
return std::nullopt;
131+
}
132+
return {};
133+
}
134+
135+
std::vector<int64_t> ParquetWriter::split_offsets() {
136+
if (!impl_->Closed()) {
137+
return {};
138+
}
139+
return {};
140+
}
141+
142+
void ParquetWriter::Register() {
143+
static WriterFactoryRegistry parquet_writer_register(
144+
FileFormatType::kParquet, []() -> Result<std::unique_ptr<Writer>> {
145+
return std::make_unique<ParquetWriter>();
146+
});
147+
}
148+
149+
} // namespace iceberg::parquet
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::parquet {
26+
27+
/// \brief A writer that writes ArrowArray to Parquet files.
28+
class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
29+
public:
30+
ParquetWriter() = default;
31+
32+
~ParquetWriter() override;
33+
34+
Status Open(const WriterOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Status Write(ArrowArray array) final;
39+
40+
std::optional<Metrics> metrics() final;
41+
42+
std::optional<int64_t> length() final;
43+
44+
std::vector<int64_t> split_offsets() final;
45+
46+
static void Register();
47+
48+
private:
49+
class Impl;
50+
std::unique_ptr<Impl> impl_;
51+
};
52+
53+
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)