|
242 | 242 | //! Ok(()) |
243 | 243 | //! } |
244 | 244 | //! ``` |
| 245 | +//! |
| 246 | +//! # Adding Partitioning to Data File Writers |
| 247 | +//! |
| 248 | +//! You can wrap a `DataFileWriter` with partitioning writers to handle partitioned tables. |
| 249 | +//! Iceberg provides two partitioning strategies: |
| 250 | +//! |
| 251 | +//! ## FanoutWriter - For Unsorted Data |
| 252 | +//! |
| 253 | +//! Wraps the data file writer to handle unsorted data by maintaining multiple active writers. |
| 254 | +//! Use this when your data is not pre-sorted by partition key. Writes to different partitions |
| 255 | +//! can happen in any order, even interleaved. |
| 256 | +//! |
| 257 | +//! ```rust, no_run |
| 258 | +//! # // Same setup as the simple example above... |
| 259 | +//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; |
| 260 | +//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; |
| 261 | +//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent}; |
| 262 | +//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; |
| 263 | +//! # use iceberg::writer::file_writer::ParquetWriterBuilder; |
| 264 | +//! # use iceberg::writer::file_writer::location_generator::{ |
| 265 | +//! # DefaultFileNameGenerator, DefaultLocationGenerator, |
| 266 | +//! # }; |
| 267 | +//! # use parquet::file::properties::WriterProperties; |
| 268 | +//! # use std::collections::HashMap; |
| 269 | +//! # #[tokio::main] |
| 270 | +//! # async fn main() -> Result<()> { |
| 271 | +//! # let catalog = MemoryCatalogBuilder::default() |
| 272 | +//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())])) |
| 273 | +//! # .await?; |
| 274 | +//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?; |
| 275 | +//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); |
| 276 | +//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet); |
| 277 | +//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone()); |
| 278 | +//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( |
| 279 | +//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator); |
| 280 | +//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); |
| 281 | +//! |
| 282 | +//! // Wrap the data file writer with FanoutWriter for partitioning |
| 283 | +//! use iceberg::writer::partitioning::fanout_writer::FanoutWriter; |
| 284 | +//! use iceberg::writer::partitioning::PartitioningWriter; |
| 285 | +//! use iceberg::spec::{Literal, PartitionKey, Struct}; |
| 286 | +//! |
| 287 | +//! let mut fanout_writer = FanoutWriter::new(data_file_writer_builder); |
| 288 | +//! |
| 289 | +//! // Create partition keys for different regions |
| 290 | +//! let schema = table.metadata().current_schema().clone(); |
| 291 | +//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone(); |
| 292 | +//! |
| 293 | +//! let partition_key_us = PartitionKey::new( |
| 294 | +//! partition_spec.clone(), |
| 295 | +//! schema.clone(), |
| 296 | +//! Struct::from_iter([Some(Literal::string("US"))]), |
| 297 | +//! ); |
| 298 | +//! |
| 299 | +//! let partition_key_eu = PartitionKey::new( |
| 300 | +//! partition_spec.clone(), |
| 301 | +//! schema.clone(), |
| 302 | +//! Struct::from_iter([Some(Literal::string("EU"))]), |
| 303 | +//! ); |
| 304 | +//! |
| 305 | +//! // Write to different partitions in any order - can interleave partition writes |
| 306 | +//! // fanout_writer.write(partition_key_us.clone(), batch_us1).await?; |
| 307 | +//! // fanout_writer.write(partition_key_eu.clone(), batch_eu1).await?; |
| 308 | +//! // fanout_writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US - OK! |
| 309 | +//! // fanout_writer.write(partition_key_eu.clone(), batch_eu2).await?; // Back to EU - OK! |
| 310 | +//! |
| 311 | +//! let data_files = fanout_writer.close().await?; |
| 312 | +//! # Ok(()) |
| 313 | +//! # } |
| 314 | +//! ``` |
| 315 | +//! |
| 316 | +//! ## ClusteredWriter - For Sorted Data |
| 317 | +//! |
| 318 | +//! Wraps the data file writer for pre-sorted data. More memory efficient as it maintains |
| 319 | +//! only one active writer at a time, but requires input sorted by partition key. |
| 320 | +//! |
| 321 | +//! ```rust, no_run |
| 322 | +//! # // Same setup as the simple example above... |
| 323 | +//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; |
| 324 | +//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; |
| 325 | +//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent}; |
| 326 | +//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; |
| 327 | +//! # use iceberg::writer::file_writer::ParquetWriterBuilder; |
| 328 | +//! # use iceberg::writer::file_writer::location_generator::{ |
| 329 | +//! # DefaultFileNameGenerator, DefaultLocationGenerator, |
| 330 | +//! # }; |
| 331 | +//! # use parquet::file::properties::WriterProperties; |
| 332 | +//! # use std::collections::HashMap; |
| 333 | +//! # #[tokio::main] |
| 334 | +//! # async fn main() -> Result<()> { |
| 335 | +//! # let catalog = MemoryCatalogBuilder::default() |
| 336 | +//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())])) |
| 337 | +//! # .await?; |
| 338 | +//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?; |
| 339 | +//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); |
| 340 | +//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet); |
| 341 | +//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone()); |
| 342 | +//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( |
| 343 | +//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator); |
| 344 | +//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); |
| 345 | +//! |
| 346 | +//! // Wrap the data file writer with ClusteredWriter for sorted partitioning |
| 347 | +//! use iceberg::writer::partitioning::clustered_writer::ClusteredWriter; |
| 348 | +//! use iceberg::writer::partitioning::PartitioningWriter; |
| 349 | +//! use iceberg::spec::{Literal, PartitionKey, Struct}; |
| 350 | +//! |
| 351 | +//! let mut clustered_writer = ClusteredWriter::new(data_file_writer_builder); |
| 352 | +//! |
| 353 | +//! // Create partition keys (must write in sorted order) |
| 354 | +//! let schema = table.metadata().current_schema().clone(); |
| 355 | +//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone(); |
| 356 | +//! |
| 357 | +//! let partition_key_asia = PartitionKey::new( |
| 358 | +//! partition_spec.clone(), |
| 359 | +//! schema.clone(), |
| 360 | +//! Struct::from_iter([Some(Literal::string("ASIA"))]), |
| 361 | +//! ); |
| 362 | +//! |
| 363 | +//! let partition_key_eu = PartitionKey::new( |
| 364 | +//! partition_spec.clone(), |
| 365 | +//! schema.clone(), |
| 366 | +//! Struct::from_iter([Some(Literal::string("EU"))]), |
| 367 | +//! ); |
| 368 | +//! |
| 369 | +//! let partition_key_us = PartitionKey::new( |
| 370 | +//! partition_spec.clone(), |
| 371 | +//! schema.clone(), |
| 372 | +//! Struct::from_iter([Some(Literal::string("US"))]), |
| 373 | +//! ); |
| 374 | +//! |
| 375 | +//! // Write to partitions in sorted order (ASIA -> EU -> US) |
| 376 | +//! // clustered_writer.write(partition_key_asia, batch_asia).await?; |
| 377 | +//! // clustered_writer.write(partition_key_eu, batch_eu).await?; |
| 378 | +//! // clustered_writer.write(partition_key_us, batch_us).await?; |
| 379 | +//! // Writing back to ASIA would fail since data must be sorted! |
| 380 | +//! |
| 381 | +//! let data_files = clustered_writer.close().await?; |
| 382 | +//! |
| 383 | +//! Ok(()) |
| 384 | +//! } |
| 385 | +//! ``` |
245 | 386 |
|
246 | 387 | pub mod base_writer; |
247 | 388 | pub mod file_writer; |
248 | | -/// Provides partition-aware writers |
249 | | -/// TODO examples |
250 | 389 | pub mod partitioning; |
251 | 390 |
|
252 | 391 | use arrow_array::RecordBatch; |
|
0 commit comments