diff --git a/stac_geoparquet/arrow/_crs.py b/stac_geoparquet/arrow/_crs.py new file mode 100644 index 0000000..2c72ffd --- /dev/null +++ b/stac_geoparquet/arrow/_crs.py @@ -0,0 +1,3 @@ +from pyproj import CRS + +WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict() diff --git a/stac_geoparquet/arrow/_from_arrow.py b/stac_geoparquet/arrow/_from_arrow.py index f940864..ec717ae 100644 --- a/stac_geoparquet/arrow/_from_arrow.py +++ b/stac_geoparquet/arrow/_from_arrow.py @@ -1,13 +1,17 @@ """Convert STAC Items in Arrow Table format to JSON Lines or Python dicts.""" -import os import json -from typing import Iterable, List, Union +import operator +import os +from functools import reduce +from typing import Any, Dict, Iterable, List, Sequence, Tuple, Union import numpy as np import pyarrow as pa import pyarrow.compute as pc import shapely +from numpy.typing import NDArray +import shapely.geometry def stac_table_to_ndjson(table: pa.Table, dest: Union[str, os.PathLike[str]]) -> None: @@ -22,20 +26,46 @@ def stac_table_to_items(table: pa.Table) -> Iterable[dict]: """Convert a STAC Table to a generator of STAC Item `dict`s""" table = _undo_stac_table_transformations(table) - # Convert WKB geometry column to GeoJSON, and then assign the geojson geometry when - # converting each row to a dictionary. - for batch in table.to_batches(): - geoms = shapely.from_wkb(batch["geometry"]) - geojson_strings = shapely.to_geojson(geoms) + # Find all paths in the schema that have a WKB geometry + geometry_paths = [["geometry"]] + try: + table.schema.field("properties").type.field("proj:geometry") + geometry_paths.append(["properties", "proj:geometry"]) + except KeyError: + pass - # RecordBatch is missing a `drop()` method, so we keep all columns other than - # geometry instead - keep_column_names = [name for name in batch.column_names if name != "geometry"] - struct_batch = batch.select(keep_column_names).to_struct_array() + assets_struct = table.schema.field("assets").type + for asset_idx in range(assets_struct.num_fields): + asset_field = assets_struct.field(asset_idx) + if "proj:geometry" in pa.schema(asset_field).names: + geometry_paths.append(["assets", asset_field.name, "proj:geometry"]) + for batch in table.to_batches(): + # Convert each geometry column to a Shapely geometry, and then assign the + # geojson geometry when converting each row to a dictionary. + geometries: List[NDArray[np.object_]] = [] + for geometry_path in geometry_paths: + col = batch + for path_segment in geometry_path: + if isinstance(col, pa.RecordBatch): + col = col[path_segment] + elif pa.types.is_struct(col.type): + col = pc.struct_field(col, path_segment) + else: + raise AssertionError(f"unexpected type {type(col)}") + + geometries.append(shapely.from_wkb(col)) + + struct_batch = batch.to_struct_array() for row_idx in range(len(struct_batch)): row_dict = struct_batch[row_idx].as_py() - row_dict["geometry"] = json.loads(geojson_strings[row_idx]) + for geometry_path, geometry_column in zip(geometry_paths, geometries): + geojson_g = geometry_column[row_idx].__geo_interface__ + geojson_g["coordinates"] = convert_tuples_to_lists( + geojson_g["coordinates"] + ) + set_by_path(row_dict, geometry_path, geojson_g) + yield row_dict @@ -164,3 +194,56 @@ def _convert_bbox_to_array(table: pa.Table) -> pa.Table: new_chunks.append(list_arr) return table.set_column(bbox_col_idx, "bbox", new_chunks) + + +def convert_tuples_to_lists(t: List | Tuple) -> List[Any]: + """Convert tuples to lists, recursively + + For example, converts: + ``` + ( + ( + (-112.4820566, 38.1261015), + (-112.4816283, 38.1331311), + (-112.4833551, 38.1338897), + (-112.4832919, 38.1307687), + (-112.4855415, 38.1291793), + (-112.4820566, 38.1261015), + ), + ) + ``` + + to + + ```py + [ + [ + [-112.4820566, 38.1261015], + [-112.4816283, 38.1331311], + [-112.4833551, 38.1338897], + [-112.4832919, 38.1307687], + [-112.4855415, 38.1291793], + [-112.4820566, 38.1261015], + ] + ] + ``` + + From https://stackoverflow.com/a/1014669. + """ + return list(map(convert_tuples_to_lists, t)) if isinstance(t, (list, tuple)) else t + + +def get_by_path(root: Dict[str, Any], keys: Sequence[str]) -> Any: + """Access a nested object in root by item sequence. + + From https://stackoverflow.com/a/14692747 + """ + return reduce(operator.getitem, keys, root) + + +def set_by_path(root: Dict[str, Any], keys: Sequence[str], value: Any) -> None: + """Set a value in a nested object in root by item sequence. + + From https://stackoverflow.com/a/14692747 + """ + get_by_path(root, keys[:-1])[keys[-1]] = value # type: ignore diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py new file mode 100644 index 0000000..c3c0ecb --- /dev/null +++ b/stac_geoparquet/arrow/_schema/models.py @@ -0,0 +1,70 @@ +import json +from pathlib import Path +from typing import Any, Dict, Iterable, Sequence, Union + +import pyarrow as pa + +from stac_geoparquet.arrow._util import stac_items_to_arrow + + +class InferredSchema: + """ + A schema representing the original STAC JSON with absolutely minimal modifications. + + The only modification from the data is converting any geometry fields from GeoJSON + to WKB. + """ + + inner: pa.Schema + """The underlying Arrow schema.""" + + count: int + """The total number of items scanned.""" + + def __init__(self) -> None: + self.inner = pa.schema([]) + self.count = 0 + + def update_from_ndjson( + self, + path: Union[Union[str, Path], Iterable[Union[str, Path]]], + *, + chunk_size: int = 65536, + ) -> None: + """ + Update this inferred schema from one or more newline-delimited JSON STAC files. + + Args: + path: One or more paths to files with STAC items. + chunk_size: The chunk size to load into memory at a time. Defaults to 65536. + """ + # Handle multi-path input + if not isinstance(path, (str, Path)): + for p in path: + self.update_from_ndjson(p) + + return + + # Handle single-path input + with open(path) as f: + items = [] + for line in f: + item = json.loads(line) + items.append(item) + + if len(items) >= chunk_size: + self.update_from_items(items) + items = [] + + # Handle remainder + if len(items) > 0: + self.update_from_items(items) + + def update_from_items(self, items: Sequence[Dict[str, Any]]) -> None: + """Update this inferred schema from a sequence of STAC Items.""" + self.count += len(items) + current_schema = stac_items_to_arrow(items, schema=None).schema + new_schema = pa.unify_schemas( + [self.inner, current_schema], promote_options="permissive" + ) + self.inner = new_schema diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 5688e31..0918d34 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -1,10 +1,19 @@ """Convert STAC data into Arrow tables""" import json -from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Union, Generator +from typing import ( + Any, + Dict, + Generator, + Iterable, + Iterator, + List, + Optional, + Sequence, + Union, +) import ciso8601 import numpy as np @@ -13,7 +22,9 @@ import shapely import shapely.geometry -from stac_geoparquet.arrow._to_parquet import WGS84_CRS_JSON +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.arrow._crs import WGS84_CRS_JSON +from stac_geoparquet.arrow._util import stac_items_to_arrow def _chunks( @@ -28,8 +39,8 @@ def parse_stac_items_to_arrow( items: Sequence[Dict[str, Any]], *, chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, - downcast: bool = True, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, + downcast: bool = False, ) -> pa.Table: """Parse a collection of STAC Items to a :class:`pyarrow.Table`. @@ -51,11 +62,14 @@ def parse_stac_items_to_arrow( """ if schema is not None: + if isinstance(schema, InferredSchema): + schema = schema.inner + # If schema is provided, then for better memory usage we parse input STAC items # to Arrow batches in chunks. batches = [] for chunk in _chunks(items, chunk_size): - batches.append(_stac_items_to_arrow(chunk, schema=schema)) + batches.append(stac_items_to_arrow(chunk, schema=schema)) table = pa.Table.from_batches(batches, schema=schema) else: @@ -63,48 +77,81 @@ def parse_stac_items_to_arrow( # else it would be possible for a STAC item late in the collection (after the # first chunk) to have a different schema and not match the schema inferred for # the first chunk. - table = pa.Table.from_batches([_stac_items_to_arrow(items)]) + table = pa.Table.from_batches([stac_items_to_arrow(items)]) return _process_arrow_table(table, downcast=downcast) def parse_stac_ndjson_to_arrow( - path: Union[str, Path], + path: Union[Union[str, Path], Iterable[Union[str, Path]]], *, - chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, - downcast: bool = True, -) -> pa.Table: + chunk_size: int = 65536, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, +) -> Iterator[pa.RecordBatch]: + """ + Convert one or more newline-delimited JSON STAC files to a generator of Arrow + RecordBatches. + + Each RecordBatch in the returned iterator is guaranteed to have an identical schema, + and can be used to write to one or more Parquet files. + + Args: + path: One or more paths to files with STAC items. + chunk_size: The chunk size. Defaults to 65536. + schema: The schema to represent the input STAC data. Defaults to None, in which + case the schema will first be inferred via a full pass over the input data. + In this case, there will be two full passes over the input data: one to + infer a common schema across all data and another to read the data. + + Yields: + Arrow RecordBatch with a single chunk of Item data. + """ # Define outside of if/else to make mypy happy items: List[dict] = [] # If the schema was not provided, then we need to load all data into memory at once # to perform schema resolution. if schema is None: - with open(path) as f: - for line in f: - items.append(json.loads(line)) + inferred_schema = InferredSchema() + inferred_schema.update_from_ndjson(path, chunk_size=chunk_size) + yield from parse_stac_ndjson_to_arrow( + path, chunk_size=chunk_size, schema=inferred_schema + ) + return + + # Check if path is an iterable + # If so, recursively call this function on each item in the iterable + if not isinstance(path, (str, Path)): + for p in path: + yield from parse_stac_ndjson_to_arrow( + p, chunk_size=chunk_size, schema=schema + ) + + return - return parse_stac_items_to_arrow(items, chunk_size=chunk_size, schema=schema) + if isinstance(schema, InferredSchema): + schema = schema.inner # Otherwise, we can stream over the input, converting each batch of `chunk_size` # into an Arrow RecordBatch at a time. This is much more memory efficient. with open(path) as f: - batches: List[pa.RecordBatch] = [] for line in f: items.append(json.loads(line)) if len(items) >= chunk_size: - batches.append(_stac_items_to_arrow(items, schema=schema)) + batch = stac_items_to_arrow(items, schema=schema) + yield from _process_arrow_table( + pa.Table.from_batches([batch]), downcast=False + ).to_batches() items = [] # Don't forget the last chunk in case the total number of items is not a multiple of # chunk_size. if len(items) > 0: - batches.append(_stac_items_to_arrow(items, schema=schema)) - - table = pa.Table.from_batches(batches, schema=schema) - return _process_arrow_table(table, downcast=downcast) + batch = stac_items_to_arrow(items, schema=schema) + yield from _process_arrow_table( + pa.Table.from_batches([batch]), downcast=False + ).to_batches() def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: @@ -115,47 +162,6 @@ def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: return table -def _stac_items_to_arrow( - items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None -) -> pa.RecordBatch: - """Convert dicts representing STAC Items to Arrow - - This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple - geometry types. - - All items will be parsed into a single RecordBatch, meaning that each internal array - is fully contiguous in memory for the length of `items`. - - Args: - items: STAC Items to convert to Arrow - - Kwargs: - schema: An optional schema that describes the format of the data. Note that this - must represent the geometry column as binary type. - - Returns: - Arrow RecordBatch with items in Arrow - """ - # Preprocess GeoJSON to WKB in each STAC item - # Otherwise, pyarrow will try to parse coordinates into a native geometry type and - # if you have multiple geometry types pyarrow will error with - # `ArrowInvalid: cannot mix list and non-list, non-null values` - wkb_items = [] - for item in items: - wkb_item = deepcopy(item) - # Note: this mutates the existing items. Should we - wkb_item["geometry"] = shapely.to_wkb( - shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" - ) - wkb_items.append(wkb_item) - - if schema is not None: - array = pa.array(wkb_items, type=pa.struct(schema)) - else: - array = pa.array(wkb_items) - return pa.RecordBatch.from_struct_array(array) - - def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: """Bring all the fields inside of the nested "properties" struct to the top level""" properties_field = table.schema.field("properties") diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 3641001..94da692 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -1,11 +1,44 @@ import json -from typing import Any +from pathlib import Path +from typing import Any, Iterable, Optional, Union import pyarrow as pa import pyarrow.parquet as pq -from pyproj import CRS -WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict() +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.arrow._to_arrow import parse_stac_ndjson_to_arrow +from stac_geoparquet.arrow._crs import WGS84_CRS_JSON + + +def parse_stac_ndjson_to_parquet( + input_path: Union[Union[str, Path], Iterable[Union[str, Path]]], + output_path: Union[str, Path], + *, + chunk_size: int = 65536, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, + **kwargs: Any, +) -> None: + """Convert one or more newline-delimited JSON STAC files to GeoParquet + + Args: + input_path: One or more paths to files with STAC items. + output_path: A path to the output Parquet file. + chunk_size: The chunk size. Defaults to 65536. + schema: The schema to represent the input STAC data. Defaults to None, in which + case the schema will first be inferred via a full pass over the input data. + In this case, there will be two full passes over the input data: one to + infer a common schema across all data and another to read the data and + iteratively convert to GeoParquet. + """ + batches_iter = parse_stac_ndjson_to_arrow( + input_path, chunk_size=chunk_size, schema=schema + ) + first_batch = next(batches_iter) + schema = first_batch.schema.with_metadata(_create_geoparquet_metadata()) + with pq.ParquetWriter(output_path, schema, **kwargs) as writer: + writer.write_batch(first_batch) + for batch in batches_iter: + writer.write_batch(batch) def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: @@ -17,6 +50,14 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: table: The table to write to Parquet where: The destination for saving. """ + metadata = table.schema.metadata or {} + metadata.update(_create_geoparquet_metadata()) + table = table.replace_schema_metadata(metadata) + + pq.write_table(table, where, **kwargs) + + +def _create_geoparquet_metadata() -> dict[bytes, bytes]: # TODO: include bbox of geometries column_meta = { "encoding": "WKB", @@ -38,9 +79,4 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: "columns": {"geometry": column_meta}, "primary_column": "geometry", } - - metadata = table.schema.metadata or {} - metadata.update({b"geo": json.dumps(geo_meta).encode("utf-8")}) - table = table.replace_schema_metadata(metadata) - - pq.write_table(table, where, **kwargs) + return {b"geo": json.dumps(geo_meta).encode("utf-8")} diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py new file mode 100644 index 0000000..8714064 --- /dev/null +++ b/stac_geoparquet/arrow/_util.py @@ -0,0 +1,63 @@ +from copy import deepcopy +from typing import Any, Dict, Optional, Sequence + +import pyarrow as pa +import shapely +import shapely.geometry + + +def stac_items_to_arrow( + items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None +) -> pa.RecordBatch: + """Convert dicts representing STAC Items to Arrow + + This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple + geometry types. + + All items will be parsed into a single RecordBatch, meaning that each internal array + is fully contiguous in memory for the length of `items`. + + Args: + items: STAC Items to convert to Arrow + + Kwargs: + schema: An optional schema that describes the format of the data. Note that this + must represent the geometry column as binary type. + + Returns: + Arrow RecordBatch with items in Arrow + """ + # Preprocess GeoJSON to WKB in each STAC item + # Otherwise, pyarrow will try to parse coordinates into a native geometry type and + # if you have multiple geometry types pyarrow will error with + # `ArrowInvalid: cannot mix list and non-list, non-null values` + wkb_items = [] + for item in items: + wkb_item = deepcopy(item) + wkb_item["geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" + ) + + # If a proj:geometry key exists in top-level properties, convert that to WKB + if "proj:geometry" in wkb_item["properties"]: + wkb_item["properties"]["proj:geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["properties"]["proj:geometry"]), + flavor="iso", + ) + + # If a proj:geometry key exists in any asset properties, convert that to WKB + for asset_value in wkb_item["assets"].values(): + if "proj:geometry" in asset_value: + asset_value["proj:geometry"] = shapely.to_wkb( + shapely.geometry.shape(asset_value["proj:geometry"]), + flavor="iso", + ) + + wkb_items.append(wkb_item) + + if schema is not None: + array = pa.array(wkb_items, type=pa.struct(schema)) + else: + array = pa.array(wkb_items) + + return pa.RecordBatch.from_struct_array(array)