Skip to content

Commit 1002270

Browse files
authored
feat: implement basic parquet writer and add roundtrip tests (#198)
Add parquet writer factory and basic parquet writer without metrics.
1 parent b80bf8d commit 1002270

15 files changed

+473
-35
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,6 @@ cmake-build-release/
2323

2424
# intellij files
2525
.idea
26+
27+
# vscode files
28+
.vscode

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ if(ICEBERG_BUILD_BUNDLE)
115115
parquet/parquet_data_util.cc
116116
parquet/parquet_reader.cc
117117
parquet/parquet_register.cc
118-
parquet/parquet_schema_util.cc)
118+
parquet/parquet_schema_util.cc
119+
parquet/parquet_writer.cc)
119120

120121
# Libraries to link with exported libiceberg_bundle.{so,a}.
121122
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/arrow/arrow_error_transform_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
3030
switch (status.code()) {
3131
case ::arrow::StatusCode::IOError:
3232
return ErrorKind::kIOError;
33+
case ::arrow::StatusCode::NotImplemented:
34+
return ErrorKind::kNotImplemented;
3335
default:
3436
return ErrorKind::kUnknownError;
3537
}

src/iceberg/constants.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
24+
namespace iceberg {
25+
26+
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
27+
28+
} // namespace iceberg

src/iceberg/parquet/parquet_reader.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ class ParquetReader::Impl {
125125
arrow_reader_properties.set_arrow_extensions_enabled(true);
126126

127127
// Open the Parquet file reader
128-
ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options));
128+
ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options));
129129
auto file_reader =
130-
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
130+
::parquet::ParquetFileReader::Open(input_stream_, reader_properties);
131131
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
132132
pool_, std::move(file_reader), arrow_reader_properties, &reader_));
133133

@@ -169,6 +169,7 @@ class ParquetReader::Impl {
169169
}
170170

171171
reader_.reset();
172+
ICEBERG_ARROW_RETURN_NOT_OK(input_stream_->Close());
172173
return {};
173174
}
174175

@@ -236,6 +237,8 @@ class ParquetReader::Impl {
236237
std::shared_ptr<::iceberg::Schema> read_schema_;
237238
// The projection result to apply to the read schema.
238239
SchemaProjection projection_;
240+
// The input stream to read Parquet file.
241+
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
239242
// Parquet file reader to create RecordBatchReader.
240243
std::unique_ptr<::parquet::arrow::FileReader> reader_;
241244
// The context to keep track of the reading progress.

src/iceberg/parquet/parquet_register.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
namespace iceberg::parquet {
2323

24-
void RegisterWriter() {}
25-
2624
void RegisterAll() {
2725
RegisterReader();
2826
RegisterWriter();

src/iceberg/parquet/parquet_schema_util.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <parquet/arrow/schema.h>
2727
#include <parquet/schema.h>
2828

29+
#include "iceberg/constants.h"
2930
#include "iceberg/metadata_columns.h"
3031
#include "iceberg/parquet/parquet_schema_util_internal.h"
3132
#include "iceberg/result.h"
@@ -38,8 +39,6 @@ namespace iceberg::parquet {
3839

3940
namespace {
4041

41-
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
42-
4342
std::optional<int32_t> FieldIdFromMetadata(
4443
const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
4544
if (!metadata) {
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/parquet/parquet_writer.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/c/bridge.h>
25+
#include <arrow/record_batch.h>
26+
#include <arrow/util/key_value_metadata.h>
27+
#include <parquet/arrow/schema.h>
28+
#include <parquet/arrow/writer.h>
29+
#include <parquet/file_writer.h>
30+
#include <parquet/properties.h>
31+
32+
#include "iceberg/arrow/arrow_error_transform_internal.h"
33+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
34+
#include "iceberg/schema_internal.h"
35+
#include "iceberg/util/checked_cast.h"
36+
#include "iceberg/util/macros.h"
37+
38+
namespace iceberg::parquet {
39+
40+
namespace {
41+
42+
Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
43+
const WriterOptions& options) {
44+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
45+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path));
46+
return output;
47+
}
48+
49+
} // namespace
50+
51+
class ParquetWriter::Impl {
52+
public:
53+
Status Open(const WriterOptions& options) {
54+
auto writer_properties =
55+
::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
56+
auto arrow_writer_properties = ::parquet::default_arrow_writer_properties();
57+
58+
ArrowSchema c_schema;
59+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
60+
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema));
61+
62+
std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
63+
ICEBERG_ARROW_RETURN_NOT_OK(
64+
::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties,
65+
*arrow_writer_properties, &schema_descriptor));
66+
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
67+
schema_descriptor->schema_root());
68+
69+
ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
70+
auto file_writer = ::parquet::ParquetFileWriter::Open(
71+
output_stream_, std::move(schema_node), std::move(writer_properties));
72+
ICEBERG_ARROW_RETURN_NOT_OK(
73+
::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_,
74+
std::move(arrow_writer_properties), &writer_));
75+
76+
return {};
77+
}
78+
79+
Status Write(ArrowArray array) {
80+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
81+
::arrow::ImportRecordBatch(&array, arrow_schema_));
82+
83+
ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
84+
85+
return {};
86+
}
87+
88+
// Close the writer and release resources
89+
Status Close() {
90+
if (writer_ == nullptr) {
91+
return {}; // Already closed
92+
}
93+
94+
ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
95+
auto& metadata = writer_->metadata();
96+
split_offsets_.reserve(metadata->num_row_groups());
97+
for (int i = 0; i < metadata->num_row_groups(); ++i) {
98+
split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
99+
}
100+
writer_.reset();
101+
102+
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
103+
ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close());
104+
return {};
105+
}
106+
107+
bool Closed() const { return writer_ == nullptr; }
108+
109+
int64_t length() const { return total_bytes_; }
110+
111+
std::vector<int64_t> split_offsets() const { return split_offsets_; }
112+
113+
private:
114+
// TODO(gangwu): make memory pool configurable
115+
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
116+
// Schema to write from the Parquet file.
117+
std::shared_ptr<::arrow::Schema> arrow_schema_;
118+
// The output stream to write Parquet file.
119+
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
120+
// Parquet file writer to write ArrowArray.
121+
std::unique_ptr<::parquet::arrow::FileWriter> writer_;
122+
// Total length of the written Parquet file.
123+
int64_t total_bytes_{0};
124+
// Row group start offsets in the Parquet file.
125+
std::vector<int64_t> split_offsets_;
126+
};
127+
128+
ParquetWriter::~ParquetWriter() = default;
129+
130+
Status ParquetWriter::Open(const WriterOptions& options) {
131+
impl_ = std::make_unique<Impl>();
132+
return impl_->Open(options);
133+
}
134+
135+
Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); }
136+
137+
Status ParquetWriter::Close() { return impl_->Close(); }
138+
139+
std::optional<Metrics> ParquetWriter::metrics() {
140+
if (!impl_->Closed()) {
141+
return std::nullopt;
142+
}
143+
return {};
144+
}
145+
146+
std::optional<int64_t> ParquetWriter::length() {
147+
if (!impl_->Closed()) {
148+
return std::nullopt;
149+
}
150+
return impl_->length();
151+
}
152+
153+
std::vector<int64_t> ParquetWriter::split_offsets() {
154+
if (!impl_->Closed()) {
155+
return {};
156+
}
157+
return impl_->split_offsets();
158+
}
159+
160+
void RegisterWriter() {
161+
static WriterFactoryRegistry parquet_writer_register(
162+
FileFormatType::kParquet, []() -> Result<std::unique_ptr<Writer>> {
163+
return std::make_unique<ParquetWriter>();
164+
});
165+
}
166+
167+
} // namespace iceberg::parquet
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::parquet {
26+
27+
/// \brief A writer that writes ArrowArray to Parquet files.
28+
class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
29+
public:
30+
ParquetWriter() = default;
31+
32+
~ParquetWriter() override;
33+
34+
Status Open(const WriterOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Status Write(ArrowArray array) final;
39+
40+
std::optional<Metrics> metrics() final;
41+
42+
std::optional<int64_t> length() final;
43+
44+
std::vector<int64_t> split_offsets() final;
45+
46+
private:
47+
class Impl;
48+
std::unique_ptr<Impl> impl_;
49+
};
50+
51+
} // namespace iceberg::parquet

src/iceberg/schema_internal.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <optional>
2525
#include <string>
2626

27+
#include "iceberg/constants.h"
2728
#include "iceberg/schema.h"
2829
#include "iceberg/type.h"
2930
#include "iceberg/util/macros.h"
@@ -45,7 +46,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n
4546
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr));
4647
if (field_id.has_value()) {
4748
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
48-
&metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()),
49+
&metadata_buffer, ArrowCharView(std::string(kParquetFieldIdKey).c_str()),
4950
ArrowCharView(std::to_string(field_id.value()).c_str())));
5051
}
5152

@@ -185,8 +186,8 @@ int32_t GetFieldId(const ArrowSchema& schema) {
185186
return kUnknownFieldId;
186187
}
187188

188-
ArrowStringView field_id_key{.data = kFieldIdKey.data(),
189-
.size_bytes = kFieldIdKey.size()};
189+
ArrowStringView field_id_key{.data = kParquetFieldIdKey.data(),
190+
.size_bytes = kParquetFieldIdKey.size()};
190191
ArrowStringView field_id_value;
191192
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
192193
NANOARROW_OK) {

0 commit comments

Comments
 (0)