@@ -7,17 +7,20 @@ use crate::{
77use bytes:: Bytes ;
88use geoparquet:: {
99 reader:: { GeoParquetReaderBuilder , GeoParquetRecordBatchReader } ,
10- writer:: GeoParquetWriterOptions ,
10+ writer:: { GeoParquetRecordBatchEncoder , GeoParquetWriterOptionsBuilder } ,
1111} ;
1212use parquet:: {
13- arrow:: arrow_reader:: ParquetRecordBatchReaderBuilder ,
13+ arrow:: { ArrowWriter , arrow_reader:: ParquetRecordBatchReaderBuilder } ,
1414 file:: { properties:: WriterProperties , reader:: ChunkReader } ,
1515 format:: KeyValue ,
1616} ;
1717use std:: io:: Write ;
1818
1919pub use parquet:: basic:: Compression ;
2020
21+ /// Default stac-geoparquet compression
22+ pub const DEFAULT_COMPRESSION : Compression = Compression :: SNAPPY ;
23+
2124/// Reads a [ItemCollection] from a [ChunkReader] as
2225/// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet).
2326///
@@ -65,7 +68,7 @@ pub fn into_writer<W>(writer: W, item_collection: impl Into<ItemCollection>) ->
6568where
6669 W : Write + Send ,
6770{
68- into_writer_with_options ( writer, item_collection, Default :: default ( ) )
71+ WriterBuilder :: new ( writer, item_collection) . write ( )
6972}
7073
7174/// Writes a [ItemCollection] to a [std::io::Write] as
7780/// use std::io::Cursor;
7881/// use stac::{Item, geoparquet::Compression};
7982///
80- /// # #[cfg(feature = "geoparquet-compression")]
81- /// # {
8283/// let item: Item = stac::read("examples/simple-item.json").unwrap();
8384/// let mut cursor = Cursor::new(Vec::new());
8485/// stac::geoparquet::into_writer_with_compression(&mut cursor, vec![item], Compression::SNAPPY).unwrap();
85- /// # }
8686/// ```
8787pub fn into_writer_with_compression < W > (
8888 writer : W ,
@@ -92,50 +92,59 @@ pub fn into_writer_with_compression<W>(
9292where
9393 W : Write + Send ,
9494{
95- let mut options = GeoParquetWriterOptions :: default ( ) ;
96- let writer_properties = WriterProperties :: builder ( )
97- . set_compression ( compression)
98- . set_key_value_metadata ( Some ( vec ! [ KeyValue {
99- key: VERSION_KEY . to_string( ) ,
100- value: Some ( VERSION . to_string( ) ) ,
101- } ] ) )
102- . build ( ) ;
103- options. writer_properties = Some ( writer_properties) ;
104- into_writer_with_options ( writer, item_collection, options)
95+ WriterBuilder :: new ( writer, item_collection)
96+ . compression ( compression)
97+ . write ( )
10598}
10699
107- /// Writes a [ItemCollection] to a [std::io::Write] as
108- /// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet) with the provided options.
109- ///
110- /// # Examples
111- ///
112- /// ```
113- /// use std::io::Cursor;
114- /// use stac::{Item, geoparquet::Compression};
115- ///
116- /// let item: Item = stac::read("examples/simple-item.json").unwrap();
117- /// let mut cursor = Cursor::new(Vec::new());
118- /// stac::geoparquet::into_writer_with_options(&mut cursor, vec![item], Default::default()).unwrap();
119- /// ```
120- pub fn into_writer_with_options < W > (
100+ struct WriterBuilder < W : Write + Send > {
121101 writer : W ,
122- item_collection : impl Into < ItemCollection > ,
123- mut options : GeoParquetWriterOptions ,
124- ) -> Result < ( ) >
125- where
126- W : Write + Send ,
127- {
128- if let Some ( primary_column) = options. primary_column . as_deref ( ) {
129- if primary_column != "geometry" {
130- log:: warn!( "primary column not set to 'geometry'" ) ;
102+ item_collection : ItemCollection ,
103+ compression : Option < Compression > ,
104+ }
105+
106+ impl < W : Write + Send > WriterBuilder < W > {
107+ fn new ( writer : W , item_collection : impl Into < ItemCollection > ) -> WriterBuilder < W > {
108+ WriterBuilder {
109+ writer,
110+ item_collection : item_collection. into ( ) ,
111+ compression : Some ( DEFAULT_COMPRESSION ) ,
131112 }
132- } else {
133- options. primary_column = Some ( "geometry" . to_string ( ) ) ;
134113 }
135- let table = Table :: from_item_collection ( item_collection) ?;
136- geoparquet:: writer:: write_geoparquet ( Box :: new ( table. into_reader ( ) ) , writer, & options) ?;
137- Ok ( ( ) )
114+
115+ fn compression ( mut self , compression : impl Into < Option < Compression > > ) -> WriterBuilder < W > {
116+ self . compression = compression. into ( ) ;
117+ self
118+ }
119+
120+ fn write ( self ) -> Result < ( ) > {
121+ let ( record_batches, schema) =
122+ Table :: from_item_collection ( self . item_collection ) ?. into_inner ( ) ;
123+ let options = GeoParquetWriterOptionsBuilder :: default ( )
124+ . set_primary_column ( "geometry" . to_string ( ) )
125+ . build ( ) ;
126+ let mut encoder = GeoParquetRecordBatchEncoder :: try_new ( & schema, & options) ?;
127+ let mut builder = WriterProperties :: builder ( ) ;
128+ if let Some ( compression) = self . compression {
129+ builder = builder. set_compression ( compression) ;
130+ }
131+ let properties = builder. build ( ) ;
132+ let mut writer =
133+ ArrowWriter :: try_new ( self . writer , encoder. target_schema ( ) , Some ( properties) ) ?;
134+ for record_batch in record_batches {
135+ let record_batch = encoder. encode_record_batch ( & record_batch) ?;
136+ writer. write ( & record_batch) ?;
137+ }
138+ writer. append_key_value_metadata ( encoder. into_keyvalue ( ) ?) ;
139+ writer. append_key_value_metadata ( KeyValue :: new (
140+ VERSION_KEY . to_string ( ) ,
141+ Some ( VERSION . to_string ( ) ) ,
142+ ) ) ;
143+ let _ = writer. finish ( ) ?;
144+ Ok ( ( ) )
145+ }
138146}
147+
139148/// Create a STAC object from geoparquet data.
140149pub trait FromGeoparquet : Sized {
141150 /// Creates a STAC object from geoparquet bytes.
@@ -329,7 +338,7 @@ mod tests {
329338 . file_metadata ( )
330339 . key_value_metadata ( )
331340 . unwrap ( )
332- . into_iter ( )
341+ . iter ( )
333342 . find ( |key_value| key_value. key == "geo" )
334343 . unwrap ( ) ;
335344 let value: serde_json:: Value =
0 commit comments