2020#include " iceberg/parquet/parquet_writer.h"
2121
2222#include < memory>
23+ #include < optional>
2324#include < string_view>
25+ #include < utility>
26+ #include < vector>
2427
2528#include < arrow/c/bridge.h>
2629#include < arrow/record_batch.h>
3336
3437#include " iceberg/arrow/arrow_io_internal.h"
3538#include " iceberg/arrow/arrow_status_internal.h"
39+ #include " iceberg/parquet/parquet_data_util_internal.h"
40+ #include " iceberg/schema.h"
3641#include " iceberg/schema_internal.h"
42+ #include " iceberg/schema_util.h"
43+ #include " iceberg/type.h"
3744#include " iceberg/util/macros.h"
3845
3946namespace iceberg ::parquet {
@@ -81,6 +88,75 @@ Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& propertie
8188 return level;
8289}
8390
91+ Result<std::shared_ptr<Type>> PruneUnknownType (const std::shared_ptr<Type>& type);
92+
93+ Result<std::optional<SchemaField>> PruneUnknownField (const SchemaField& field,
94+ bool allow_skip_unknown) {
95+ if (field.type ()->type_id () == TypeId::kUnknown ) {
96+ if (!allow_skip_unknown) {
97+ return NotSupported (
98+ " Cannot write unknown field '{}' inside a list or map to Parquet" ,
99+ field.name ());
100+ }
101+ return std::nullopt ;
102+ }
103+
104+ ICEBERG_ASSIGN_OR_RAISE (auto pruned_type, PruneUnknownType (field.type ()));
105+ return SchemaField (field.field_id (), field.name (), std::move (pruned_type),
106+ field.optional (), field.doc ());
107+ }
108+
109+ Result<std::shared_ptr<Type>> PruneUnknownType (const std::shared_ptr<Type>& type) {
110+ switch (type->type_id ()) {
111+ case TypeId::kUnknown :
112+ return NotSupported (" Cannot write unknown field to Parquet" );
113+ case TypeId::kStruct : {
114+ const auto & struct_type = static_cast <const StructType&>(*type);
115+ std::vector<SchemaField> fields;
116+ for (const auto & field : struct_type.fields ()) {
117+ ICEBERG_ASSIGN_OR_RAISE (auto pruned_field,
118+ PruneUnknownField (field, /* allow_skip_unknown=*/ true ));
119+ if (pruned_field.has_value ()) {
120+ fields.emplace_back (std::move (pruned_field.value ()));
121+ }
122+ }
123+ return std::make_shared<StructType>(std::move (fields));
124+ }
125+ case TypeId::kList : {
126+ const auto & list_type = static_cast <const ListType&>(*type);
127+ ICEBERG_ASSIGN_OR_RAISE (
128+ auto pruned_element,
129+ PruneUnknownField (list_type.element (), /* allow_skip_unknown=*/ false ));
130+ return std::make_shared<ListType>(std::move (pruned_element.value ()));
131+ }
132+ case TypeId::kMap : {
133+ const auto & map_type = static_cast <const MapType&>(*type);
134+ ICEBERG_ASSIGN_OR_RAISE (
135+ auto pruned_key,
136+ PruneUnknownField (map_type.key (), /* allow_skip_unknown=*/ false ));
137+ ICEBERG_ASSIGN_OR_RAISE (
138+ auto pruned_value,
139+ PruneUnknownField (map_type.value (), /* allow_skip_unknown=*/ false ));
140+ return std::make_shared<MapType>(std::move (pruned_key.value ()),
141+ std::move (pruned_value.value ()));
142+ }
143+ default :
144+ return type;
145+ }
146+ }
147+
148+ Result<std::shared_ptr<Schema>> PruneUnknownFields (const Schema& schema) {
149+ std::vector<SchemaField> fields;
150+ for (const auto & field : schema.fields ()) {
151+ ICEBERG_ASSIGN_OR_RAISE (auto pruned_field,
152+ PruneUnknownField (field, /* allow_skip_unknown=*/ true ));
153+ if (pruned_field.has_value ()) {
154+ fields.emplace_back (std::move (pruned_field.value ()));
155+ }
156+ }
157+ return std::make_shared<Schema>(std::move (fields), schema.schema_id ());
158+ }
159+
84160} // namespace
85161
86162class ParquetWriter ::Impl {
@@ -97,8 +173,17 @@ class ParquetWriter::Impl {
97173 auto writer_properties = properties_builder.memory_pool (pool_)->build ();
98174 auto arrow_writer_properties = ::parquet::default_arrow_writer_properties ();
99175
176+ ArrowSchema input_c_schema;
177+ ICEBERG_RETURN_UNEXPECTED (ToArrowSchema (*options.schema , &input_c_schema));
178+ ICEBERG_ARROW_ASSIGN_OR_RETURN (input_arrow_schema_,
179+ ::arrow::ImportSchema (&input_c_schema));
180+
181+ ICEBERG_ASSIGN_OR_RAISE (write_schema_, PruneUnknownFields (*options.schema ));
182+ ICEBERG_ASSIGN_OR_RAISE (write_projection_, Project (*write_schema_, *options.schema ,
183+ /* prune_source=*/ false ));
184+
100185 ArrowSchema c_schema;
101- ICEBERG_RETURN_UNEXPECTED (ToArrowSchema (*options. schema , &c_schema));
186+ ICEBERG_RETURN_UNEXPECTED (ToArrowSchema (*write_schema_ , &c_schema));
102187 ICEBERG_ARROW_ASSIGN_OR_RETURN (arrow_schema_, ::arrow::ImportSchema (&c_schema));
103188
104189 std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
@@ -123,8 +208,12 @@ class ParquetWriter::Impl {
123208 }
124209
125210 Status Write (ArrowArray* array) {
126- ICEBERG_ARROW_ASSIGN_OR_RETURN (auto batch,
127- ::arrow::ImportRecordBatch (array, arrow_schema_));
211+ ICEBERG_ARROW_ASSIGN_OR_RETURN (
212+ auto input_batch, ::arrow::ImportRecordBatch (array, input_arrow_schema_));
213+ ICEBERG_ASSIGN_OR_RAISE (
214+ auto batch,
215+ ProjectRecordBatch (std::move (input_batch), arrow_schema_, *write_schema_,
216+ write_projection_, arrow::MetadataColumnContext{}, pool_));
128217
129218 ICEBERG_ARROW_RETURN_NOT_OK (writer_->WriteRecordBatch (*batch));
130219
@@ -168,6 +257,11 @@ class ParquetWriter::Impl {
168257 private:
169258 // TODO(gangwu): make memory pool configurable
170259 ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
260+ // The schema accepted from callers.
261+ std::shared_ptr<::arrow::Schema> input_arrow_schema_;
262+ // The Iceberg schema that has v3 unknown fields removed for physical writes.
263+ std::shared_ptr<Schema> write_schema_;
264+ SchemaProjection write_projection_;
171265 // Schema to write from the Parquet file.
172266 std::shared_ptr<::arrow::Schema> arrow_schema_;
173267 // The output stream to write Parquet file.
0 commit comments