Skip to content

Commit 978dfcf

Browse files
committed
add some tests
1 parent 7458797 commit 978dfcf

File tree

10 files changed

+245
-23
lines changed

10 files changed

+245
-23
lines changed

src/iceberg/constants.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
24+
namespace iceberg {
25+
26+
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
27+
28+
} // namespace iceberg

src/iceberg/parquet/parquet_reader.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ class ParquetReader::Impl {
125125
arrow_reader_properties.set_arrow_extensions_enabled(true);
126126

127127
// Open the Parquet file reader
128-
ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options));
128+
ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options));
129129
auto file_reader =
130-
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
130+
::parquet::ParquetFileReader::Open(input_stream_, reader_properties);
131131
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
132132
pool_, std::move(file_reader), arrow_reader_properties, &reader_));
133133

@@ -169,6 +169,7 @@ class ParquetReader::Impl {
169169
}
170170

171171
reader_.reset();
172+
ICEBERG_ARROW_RETURN_NOT_OK(input_stream_->Close());
172173
return {};
173174
}
174175

@@ -240,6 +241,8 @@ class ParquetReader::Impl {
240241
std::unique_ptr<::parquet::arrow::FileReader> reader_;
241242
// The context to keep track of the reading progress.
242243
std::unique_ptr<ReadContext> context_;
244+
// The input stream to read Parquet file.
245+
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
243246
};
244247

245248
ParquetReader::~ParquetReader() = default;

src/iceberg/parquet/parquet_register.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
namespace iceberg::parquet {
2323

24-
void RegisterWriter() {}
25-
2624
void RegisterAll() {
2725
RegisterReader();
2826
RegisterWriter();

src/iceberg/parquet/parquet_schema_util.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <parquet/arrow/schema.h>
2727
#include <parquet/schema.h>
2828

29+
#include "iceberg/constants.h"
2930
#include "iceberg/metadata_columns.h"
3031
#include "iceberg/parquet/parquet_schema_util_internal.h"
3132
#include "iceberg/result.h"
@@ -38,8 +39,6 @@ namespace iceberg::parquet {
3839

3940
namespace {
4041

41-
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
42-
4342
std::optional<int32_t> FieldIdFromMetadata(
4443
const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
4544
if (!metadata) {

src/iceberg/parquet/parquet_writer.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ class ParquetWriter::Impl {
6969
std::shared_ptr<::arrow::KeyValueMetadata> metadata =
7070
::arrow::key_value_metadata(options.properties);
7171

72-
ICEBERG_ASSIGN_OR_RAISE(auto output_stream, OpenOutputStream(options));
73-
auto file_writer = ::parquet::ParquetFileWriter::Open(
74-
std::move(output_stream), schema_node, writer_properties, metadata);
72+
ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
73+
auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, schema_node,
74+
writer_properties, metadata);
7575
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make(
7676
pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, &writer_));
7777

@@ -89,10 +89,13 @@ class ParquetWriter::Impl {
8989

9090
// Close the writer and release resources
9191
Status Close() {
92-
if (writer_ != nullptr) {
93-
ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
94-
writer_.reset();
92+
if (writer_ == nullptr) {
93+
return {}; // Already closed
9594
}
95+
96+
ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
97+
writer_.reset();
98+
ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close());
9699
return {};
97100
}
98101

@@ -105,6 +108,8 @@ class ParquetWriter::Impl {
105108
std::shared_ptr<::arrow::Schema> arrow_schema_;
106109
// Parquet file writer to write ArrowArray.
107110
std::unique_ptr<::parquet::arrow::FileWriter> writer_;
111+
// The output stream to write Parquet file.
112+
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
108113
};
109114

110115
ParquetWriter::~ParquetWriter() = default;
@@ -139,7 +144,7 @@ std::vector<int64_t> ParquetWriter::split_offsets() {
139144
return {};
140145
}
141146

142-
void ParquetWriter::Register() {
147+
void RegisterWriter() {
143148
static WriterFactoryRegistry parquet_writer_register(
144149
FileFormatType::kParquet, []() -> Result<std::unique_ptr<Writer>> {
145150
return std::make_unique<ParquetWriter>();

src/iceberg/parquet/parquet_writer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
4343

4444
std::vector<int64_t> split_offsets() final;
4545

46-
static void Register();
47-
4846
private:
4947
class Impl;
5048
std::unique_ptr<Impl> impl_;

src/iceberg/schema_internal.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <nanoarrow/nanoarrow.h>
2626

27+
#include "iceberg/constants.h"
2728
#include "iceberg/iceberg_export.h"
2829
#include "iceberg/result.h"
2930
#include "iceberg/type_fwd.h"
@@ -33,7 +34,7 @@ namespace iceberg {
3334
// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet.
3435
// Here we follow a similar convention for Iceberg but we might also add
3536
// "PARQUET:field_id" in the future once we implement a Parquet writer.
36-
constexpr std::string_view kFieldIdKey = "ICEBERG:field_id";
37+
constexpr std::string_view kFieldIdKey = kParquetFieldIdKey;
3738

3839
/// \brief Convert an Iceberg schema to an Arrow schema.
3940
///

src/iceberg/type.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,4 +319,16 @@ std::shared_ptr<FixedType> fixed(int32_t length) {
319319
return std::make_shared<FixedType>(length);
320320
}
321321

322+
std::shared_ptr<MapType> map(SchemaField key, SchemaField value) {
323+
return std::make_shared<MapType>(key, value);
324+
}
325+
326+
std::shared_ptr<ListType> list(SchemaField element) {
327+
return std::make_shared<ListType>(std::move(element));
328+
}
329+
330+
std::shared_ptr<StructType> struct_(std::vector<SchemaField> fields) {
331+
return std::make_shared<StructType>(std::move(fields));
332+
}
333+
322334
} // namespace iceberg

src/iceberg/type.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,22 @@ ICEBERG_EXPORT std::shared_ptr<DecimalType> decimal(int32_t precision, int32_t s
487487
/// \return A shared pointer to the FixedType instance.
488488
ICEBERG_EXPORT std::shared_ptr<FixedType> fixed(int32_t length);
489489

490+
/// \brief Create a StructType with the given fields.
491+
/// \param fields The fields of the struct.
492+
/// \return A shared pointer to the StructType instance.
493+
ICEBERG_EXPORT std::shared_ptr<StructType> struct_(std::vector<SchemaField> fields);
494+
495+
/// \brief Create a ListType with the given element field.
496+
/// \param element The element field of the list.
497+
/// \return A shared pointer to the ListType instance.
498+
ICEBERG_EXPORT std::shared_ptr<ListType> list(SchemaField element);
499+
500+
/// \brief Create a MapType with the given key and value fields.
501+
/// \param key The key field of the map.
502+
/// \param value The value field of the map.
503+
/// \return A shared pointer to the MapType instance.
504+
ICEBERG_EXPORT std::shared_ptr<MapType> map(SchemaField key, SchemaField value);
505+
490506
/// @}
491507

492508
} // namespace iceberg

0 commit comments

Comments
 (0)