Skip to content

Commit a591cf2

Browse files
authored
refactor: some of the geoparquet code (#768)
1 parent e2f92de commit a591cf2

File tree

4 files changed

+95
-101
lines changed

4 files changed

+95
-101
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ futures-util = "0.3.31"
5656
geo = "0.30.0"
5757
geo-traits = "0.3.0"
5858
geo-types = "0.7.16"
59-
geoarrow-array = { git = "https://github.com/geoarrow/geoarrow-rs/", rev = "d27500849c6cee019535d6749991d1fd122baecf" }
60-
geoparquet = { git = "https://github.com/geoarrow/geoarrow-rs/", rev = "d27500849c6cee019535d6749991d1fd122baecf" }
61-
geoarrow-schema = { git = "https://github.com/geoarrow/geoarrow-rs/", rev = "d27500849c6cee019535d6749991d1fd122baecf" }
59+
geoarrow-array = { git = "https://github.com/geoarrow/geoarrow-rs/", rev = "1d73a8c8f739ac2a9f4cbac928283c19b74db463" }
60+
geoparquet = { git = "https://github.com/geoarrow/geoarrow-rs/", rev = "1d73a8c8f739ac2a9f4cbac928283c19b74db463" }
61+
geoarrow-schema = { git = "https://github.com/geoarrow/geoarrow-rs/", rev = "1d73a8c8f739ac2a9f4cbac928283c19b74db463" }
6262
geojson = "0.24.1"
6363
getrandom = { version = "0.3.3", features = ["wasm_js"] }
6464
http = "1.1"

crates/core/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ pub enum Error {
5858
#[error("no \"{0}\" field in the JSON object")]
5959
MissingField(&'static str),
6060

61+
/// No geoparquet metadata in a stac-geoparquet file.
62+
#[error("no geoparquet metadata")]
63+
#[cfg(feature = "geoparquet")]
64+
MissingGeoparquetMetadata,
65+
6166
/// There are no items, when items are required.
6267
#[error("no items")]
6368
NoItems,

crates/core/src/geoarrow/mod.rs

Lines changed: 71 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use geoarrow_array::{
1212
array::{WkbArray, from_arrow_array},
1313
builder::GeometryBuilder,
1414
};
15-
use geoarrow_schema::{CoordType, GeoArrowType, GeometryType, Metadata};
15+
use geoarrow_schema::{GeoArrowType, GeometryType, Metadata};
1616
use serde_json::{Value, json};
1717
use std::{collections::HashMap, sync::Arc};
1818

@@ -22,10 +22,10 @@ pub const VERSION_KEY: &str = "stac:geoparquet_version";
2222
/// The stac-geoparquet version.
2323
pub const VERSION: &str = "1.0.0";
2424

25-
/// Other geoarrow geometry columns (other than "geometry")
26-
pub const OTHER_GEOMETRY_COLUMNS: [&str; 1] = ["proj:geometry"];
25+
/// Geometry columns.
26+
pub const GEOMETRY_COLUMNS: [&str; 2] = ["geometry", "proj:geometry"];
2727

28-
/// Geoarrow datetime columns
28+
/// Datetime columns.
2929
pub const DATETIME_COLUMNS: [&str; 8] = [
3030
"datetime",
3131
"start_datetime",
@@ -37,7 +37,7 @@ pub const DATETIME_COLUMNS: [&str; 8] = [
3737
"unpublished",
3838
];
3939

40-
/// A geoarrow table.
40+
/// A **stac-geoarrow** table.
4141
///
4242
/// `Table` existed in **geoarrow** v0.3 but was removed in v0.4. We preserve it
4343
/// here as a useful arrow-ish analog to [ItemCollection].
@@ -48,19 +48,6 @@ pub struct Table {
4848
}
4949

5050
/// A builder for converting an [ItemCollection] to a [Table]
51-
///
52-
/// # Examples
53-
///
54-
/// ```
55-
/// use stac::geoarrow::TableBuilder;
56-
///
57-
/// let item = stac::read("examples/simple-item.json").unwrap();
58-
/// let builder = TableBuilder {
59-
/// item_collection: vec![item].into(),
60-
/// drop_invalid_attributes: false,
61-
/// };
62-
/// let table = builder.build().unwrap();
63-
/// ```
6451
#[derive(Debug)]
6552
pub struct TableBuilder {
6653
/// The item collection.
@@ -70,73 +57,60 @@ pub struct TableBuilder {
7057
///
7158
/// If false, an invalid attribute will cause an error. If true, an invalid
7259
/// attribute will trigger a warning.
60+
///
61+
/// Invalid attributes are values in `properties` that would conflict with a STAC-defined top-level key.
7362
pub drop_invalid_attributes: bool,
7463
}
7564

7665
impl TableBuilder {
77-
/// Builds a [Table]
66+
/// Builds a [Table].
67+
///
68+
/// # Examples
69+
///
70+
/// ```
71+
/// use stac::geoarrow::TableBuilder;
72+
///
73+
/// let item = stac::read("examples/simple-item.json").unwrap();
74+
/// let builder = TableBuilder {
75+
/// item_collection: vec![item].into(),
76+
/// drop_invalid_attributes: false,
77+
/// };
78+
/// let table = builder.build().unwrap();
79+
/// ```
7880
pub fn build(self) -> Result<Table> {
7981
let mut values = Vec::with_capacity(self.item_collection.items.len());
80-
let geometry_type = GeometryType::new(CoordType::Interleaved, Default::default()); // the choice of interleaved is arbitrary
81-
let mut builder = GeometryBuilder::new(geometry_type.clone());
8282
let mut geometry_builders = HashMap::new();
83-
for mut item in self.item_collection.items {
84-
builder.push_geometry(
85-
item.geometry
86-
.take()
87-
.and_then(|geometry| Geometry::try_from(geometry).ok())
88-
.as_ref(),
89-
)?;
90-
let flat_item = item.into_flat_item(self.drop_invalid_attributes)?;
91-
let mut value = serde_json::to_value(flat_item)?;
83+
84+
for item in self.item_collection.items {
85+
let mut value =
86+
serde_json::to_value(item.into_flat_item(self.drop_invalid_attributes)?)?;
9287
{
9388
let value = value
9489
.as_object_mut()
9590
.expect("a flat item should serialize to an object");
96-
for key in OTHER_GEOMETRY_COLUMNS {
91+
for key in GEOMETRY_COLUMNS {
9792
if let Some(value) = value.remove(key) {
98-
let entry = geometry_builders
99-
.entry(key)
100-
.or_insert_with(|| GeometryBuilder::new(geometry_type.clone()));
93+
let entry = geometry_builders.entry(key).or_insert_with(|| {
94+
let geometry_type = GeometryType::new(Default::default());
95+
GeometryBuilder::new(geometry_type)
96+
});
10197
let geometry =
10298
geojson::Geometry::from_json_value(value).map_err(Box::new)?;
10399
entry.push_geometry(Some(
104100
&(Geometry::try_from(geometry).map_err(Box::new)?),
105101
))?;
106102
}
107103
}
108-
let _ = value.remove("geometry");
109104
if let Some(bbox) = value.remove("bbox") {
110-
let bbox = bbox
111-
.as_array()
112-
.expect("STAC items should always have a list as their bbox");
113-
if bbox.len() == 4 {
114-
let _ = value.insert("bbox".into(), json!({
115-
"xmin": bbox[0].as_number().expect("all bbox values should be a number"),
116-
"ymin": bbox[1].as_number().expect("all bbox values should be a number"),
117-
"xmax": bbox[2].as_number().expect("all bbox values should be a number"),
118-
"ymax": bbox[3].as_number().expect("all bbox values should be a number"),
119-
}));
120-
} else if bbox.len() == 6 {
121-
let _ = value.insert("bbox".into(), json!({
122-
"xmin": bbox[0].as_number().expect("all bbox values should be a number"),
123-
"ymin": bbox[1].as_number().expect("all bbox values should be a number"),
124-
"zmin": bbox[2].as_number().expect("all bbox values should be a number"),
125-
"xmax": bbox[3].as_number().expect("all bbox values should be a number"),
126-
"ymax": bbox[4].as_number().expect("all bbox values should be a number"),
127-
"zmax": bbox[5].as_number().expect("all bbox values should be a number"),
128-
}));
129-
} else {
130-
return Err(Error::InvalidBbox(
131-
bbox.iter().filter_map(|v| v.as_f64()).collect(),
132-
));
133-
}
105+
let bbox = convert_bbox(bbox)?;
106+
let _ = value.insert("bbox".to_string(), bbox);
134107
}
135108
}
136109
values.push(value);
137110
}
138111

139-
// Decode JSON
112+
// Create a geometry-less record batch of our items.
113+
// TODO do this in one pass: https://github.com/stac-utils/rustac/issues/767
140114
let schema = arrow_json::reader::infer_json_schema_from_iterator(values.iter().map(Ok))?;
141115
let mut schema_builder = SchemaBuilder::new();
142116
for field in schema.fields().iter() {
@@ -150,29 +124,23 @@ impl TableBuilder {
150124
schema_builder.push(field.clone());
151125
}
152126
}
153-
let mut metadata = schema.metadata;
154-
let _ = metadata.insert(VERSION_KEY.to_string(), VERSION.into());
155127
let schema = Arc::new(schema_builder.finish());
156128
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?;
157129
decoder.serialize(&values)?;
158-
159-
// Build the table
160130
let record_batch = decoder.flush()?.ok_or(Error::NoItems)?;
131+
132+
// Add the geometries back in.
161133
let mut schema_builder = SchemaBuilder::from(schema.fields());
162134
let mut columns = record_batch.columns().to_vec();
163-
164-
let geometry_array = builder.finish();
165-
columns.push(geometry_array.to_array_ref());
166-
schema_builder.push(geometry_array.data_type().to_field("geometry", true));
167-
168135
for (key, geometry_builder) in geometry_builders {
169136
let geometry_array = geometry_builder.finish();
170137
columns.push(geometry_array.to_array_ref());
171138
schema_builder.push(geometry_array.data_type().to_field(key, true));
172139
}
173-
174-
// Build the table
175-
let schema = Arc::new(schema_builder.finish().with_metadata(metadata));
140+
let _ = schema_builder
141+
.metadata_mut()
142+
.insert(VERSION_KEY.to_string(), VERSION.into());
143+
let schema = Arc::new(schema_builder.finish());
176144
let record_batch = RecordBatch::try_new(schema.clone(), columns)?;
177145
Ok(Table {
178146
record_batches: vec![record_batch],
@@ -266,14 +234,11 @@ pub fn with_native_geometry(
266234
);
267235
let geometry_array = geoarrow_array::cast::from_wkb(
268236
&wkb_array,
269-
GeoArrowType::Geometry(GeometryType::new(
270-
CoordType::Interleaved,
271-
Metadata::default().into(),
272-
)),
237+
GeoArrowType::Geometry(GeometryType::new(Metadata::default().into())),
273238
)?;
274239
let mut columns = record_batch.columns().to_vec();
275240
let mut schema_builder = SchemaBuilder::from(&*record_batch.schema());
276-
schema_builder.push(geometry_array.data_type().to_field("geometry", true));
241+
schema_builder.push(geometry_array.data_type().to_field(column_name, true));
277242
let schema = schema_builder.finish();
278243
columns.push(geometry_array.to_array_ref());
279244
record_batch = RecordBatch::try_new(schema.into(), columns)?;
@@ -290,7 +255,7 @@ pub fn with_wkb_geometry(mut record_batch: RecordBatch, column_name: &str) -> Re
290255
)?;
291256
let mut columns = record_batch.columns().to_vec();
292257
let mut schema_builder = SchemaBuilder::from(&*record_batch.schema());
293-
schema_builder.push(wkb_array.data_type().to_field("geometry", true));
258+
schema_builder.push(wkb_array.data_type().to_field(column_name, true));
294259
let schema = schema_builder.finish();
295260
columns.push(wkb_array.to_array_ref());
296261
record_batch = RecordBatch::try_new(schema.into(), columns)?;
@@ -316,14 +281,37 @@ pub fn add_wkb_metadata(mut record_batch: RecordBatch, column_name: &str) -> Res
316281
Ok(record_batch)
317282
}
318283

319-
// We only run tests when the geoparquet feature is enabled so that we don't
320-
// have to add geoarrow as a dev dependency for all builds.
321-
#[cfg(all(test, feature = "geoparquet"))]
284+
fn convert_bbox(bbox: Value) -> Result<Value> {
285+
let bbox = bbox
286+
.as_array()
287+
.expect("STAC items should always have a list as their bbox");
288+
if bbox.len() == 4 {
289+
Ok(json!({
290+
"xmin": bbox[0].as_number().expect("all bbox values should be a number"),
291+
"ymin": bbox[1].as_number().expect("all bbox values should be a number"),
292+
"xmax": bbox[2].as_number().expect("all bbox values should be a number"),
293+
"ymax": bbox[3].as_number().expect("all bbox values should be a number"),
294+
}))
295+
} else if bbox.len() == 6 {
296+
Ok(json!({
297+
"xmin": bbox[0].as_number().expect("all bbox values should be a number"),
298+
"ymin": bbox[1].as_number().expect("all bbox values should be a number"),
299+
"zmin": bbox[2].as_number().expect("all bbox values should be a number"),
300+
"xmax": bbox[3].as_number().expect("all bbox values should be a number"),
301+
"ymax": bbox[4].as_number().expect("all bbox values should be a number"),
302+
"zmax": bbox[5].as_number().expect("all bbox values should be a number"),
303+
}))
304+
} else {
305+
Err(Error::InvalidBbox(
306+
bbox.iter().filter_map(|v| v.as_f64()).collect(),
307+
))
308+
}
309+
}
310+
311+
#[cfg(test)]
322312
mod tests {
323313
use super::Table;
324314
use crate::{Item, ItemCollection};
325-
use geoparquet::GeoParquetRecordBatchReaderBuilder;
326-
use std::fs::File;
327315

328316
#[test]
329317
fn to_table() {
@@ -339,17 +327,6 @@ mod tests {
339327
let _ = table.schema().field_with_name("type").unwrap();
340328
}
341329

342-
#[test]
343-
fn from_table() {
344-
let file = File::open("data/extended-item.parquet").unwrap();
345-
let reader = GeoParquetRecordBatchReaderBuilder::try_new(file)
346-
.unwrap()
347-
.build()
348-
.unwrap();
349-
let item_collection = super::from_record_batch_reader(reader).unwrap();
350-
assert_eq!(item_collection.items.len(), 1);
351-
}
352-
353330
#[test]
354331
fn roundtrip() {
355332
let item: Item = crate::read("examples/simple-item.json").unwrap();

crates/core/src/geoparquet.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
//! Read data from and write data in [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet/blob/main/spec/stac-geoparquet-spec.md).
22
33
use crate::{
4-
Catalog, Collection, Item, ItemCollection, Result, Value,
4+
Catalog, Collection, Error, Item, ItemCollection, Result, Value,
55
geoarrow::{Table, VERSION, VERSION_KEY},
66
};
77
use bytes::Bytes;
8-
use geoparquet::{GeoParquetRecordBatchReaderBuilder, GeoParquetWriterOptions};
8+
use geoparquet::{
9+
reader::{GeoParquetReaderBuilder, GeoParquetRecordBatchReader},
10+
writer::GeoParquetWriterOptions,
11+
};
912
use parquet::{
13+
arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
1014
file::{properties::WriterProperties, reader::ChunkReader},
1115
format::KeyValue,
1216
};
@@ -29,7 +33,15 @@ pub fn from_reader<R>(reader: R) -> Result<ItemCollection>
2933
where
3034
R: ChunkReader + 'static,
3135
{
32-
let reader = GeoParquetRecordBatchReaderBuilder::try_new(reader)?.build()?;
36+
let builder = ParquetRecordBatchReaderBuilder::try_new(reader)?;
37+
let geoparquet_metadata = builder
38+
.geoparquet_metadata()
39+
.transpose()?
40+
.ok_or(Error::MissingGeoparquetMetadata)?;
41+
let geoarrow_schema =
42+
builder.geoarrow_schema(&geoparquet_metadata, true, Default::default())?;
43+
let reader = builder.build()?;
44+
let reader = GeoParquetRecordBatchReader::try_new(reader, geoarrow_schema)?;
3345
crate::geoarrow::from_record_batch_reader(reader)
3446
}
3547

@@ -121,7 +133,7 @@ where
121133
options.primary_column = Some("geometry".to_string());
122134
}
123135
let table = Table::from_item_collection(item_collection)?;
124-
geoparquet::write_geoparquet(Box::new(table.into_reader()), writer, &options)?;
136+
geoparquet::writer::write_geoparquet(Box::new(table.into_reader()), writer, &options)?;
125137
Ok(())
126138
}
127139
/// Create a STAC object from geoparquet data.

0 commit comments

Comments
 (0)