From 5406f96f2a18c452d7a656c54c08686bd9cf36c3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 9 May 2024 17:22:53 -0400 Subject: [PATCH 1/7] exhaustive schema inference --- stac_geoparquet/arrow/_schema/models.py | 62 +++++++++++++++ stac_geoparquet/arrow/_to_arrow.py | 100 +++++++++--------------- stac_geoparquet/arrow/_to_parquet.py | 52 ++++++++++-- stac_geoparquet/arrow/_util.py | 62 +++++++++++++++ 4 files changed, 208 insertions(+), 68 deletions(-) create mode 100644 stac_geoparquet/arrow/_schema/models.py create mode 100644 stac_geoparquet/arrow/_util.py diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py new file mode 100644 index 0000000..579f04c --- /dev/null +++ b/stac_geoparquet/arrow/_schema/models.py @@ -0,0 +1,62 @@ +import json +from pathlib import Path +from typing import Any, Dict, Iterable, List, Union + +import pyarrow as pa + +from stac_geoparquet.arrow._util import stac_items_to_arrow + + +class InferredSchema: + """ + A schema representing the original STAC JSON with absolutely minimal modifications. + + The only modification from the data is converting any geometry fields from GeoJSON + to WKB. + """ + + inner: pa.Schema + """The underlying Arrow schema.""" + + count: int + """The total number of items scanned.""" + + def __init__(self) -> None: + self.inner = pa.schema([]) + self.count = 0 + + def update_from_ndjson( + self, + path: Union[Union[str, Path], Iterable[Union[str, Path]]], + *, + chunk_size: int = 10000, + ): + # Handle multi-path input + if not isinstance(path, (str, Path)): + for p in path: + self.update_from_ndjson(p) + + return + + # Handle single-path input + with open(path) as f: + items = [] + for line in f: + item = json.loads(line) + items.append(item) + + if len(items) >= chunk_size: + self.update_from_items(items) + items = [] + + # Handle remainder + if len(items) > 0: + self.update_from_items(items) + + def update_from_items(self, items: List[Dict[str, Any]]): + self.count += len(items) + current_schema = stac_items_to_arrow(items, schema=None).schema + new_schema = pa.unify_schemas( + [self.inner, current_schema], promote_options="permissive" + ) + self.inner = new_schema diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index b5b1f06..7075452 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -1,10 +1,9 @@ """Convert STAC data into Arrow tables""" import json -from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Union, Generator +from typing import Any, Dict, Generator, Iterable, List, Optional, Sequence, Union import ciso8601 import numpy as np @@ -13,6 +12,9 @@ import shapely import shapely.geometry +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.arrow._util import stac_items_to_arrow + def _chunks( lst: Sequence[Dict[str, Any]], n: int @@ -26,8 +28,8 @@ def parse_stac_items_to_arrow( items: Sequence[Dict[str, Any]], *, chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, - downcast: bool = True, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, + downcast: bool = False, ) -> pa.Table: """Parse a collection of STAC Items to a :class:`pyarrow.Table`. @@ -49,11 +51,14 @@ def parse_stac_items_to_arrow( """ if schema is not None: + if isinstance(schema, InferredSchema): + schema = schema.inner + # If schema is provided, then for better memory usage we parse input STAC items # to Arrow batches in chunks. batches = [] for chunk in _chunks(items, chunk_size): - batches.append(_stac_items_to_arrow(chunk, schema=schema)) + batches.append(stac_items_to_arrow(chunk, schema=schema)) table = pa.Table.from_batches(batches, schema=schema) else: @@ -61,48 +66,62 @@ def parse_stac_items_to_arrow( # else it would be possible for a STAC item late in the collection (after the # first chunk) to have a different schema and not match the schema inferred for # the first chunk. - table = pa.Table.from_batches([_stac_items_to_arrow(items)]) + table = pa.Table.from_batches([stac_items_to_arrow(items)]) return _process_arrow_table(table, downcast=downcast) def parse_stac_ndjson_to_arrow( - path: Union[str, Path], + path: Union[Union[str, Path], Iterable[Union[str, Path]]], *, chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, - downcast: bool = True, -) -> pa.Table: + schema: Optional[Union[pa.Schema, InferredSchema]] = None, +): # Define outside of if/else to make mypy happy items: List[dict] = [] # If the schema was not provided, then we need to load all data into memory at once # to perform schema resolution. if schema is None: - with open(path) as f: - for line in f: - items.append(json.loads(line)) + inferred_schema = InferredSchema() + inferred_schema.update_from_ndjson(path, chunk_size=chunk_size) + return parse_stac_ndjson_to_arrow( + path, chunk_size=chunk_size, schema=inferred_schema + ) + + # Check if path is an iterable + # If so, recursively call this function on each item in the iterable + if not isinstance(path, (str, Path)): + for p in path: + yield from parse_stac_ndjson_to_arrow( + p, chunk_size=chunk_size, schema=schema + ) - return parse_stac_items_to_arrow(items, chunk_size=chunk_size, schema=schema) + return + + if isinstance(schema, InferredSchema): + schema = schema.inner # Otherwise, we can stream over the input, converting each batch of `chunk_size` # into an Arrow RecordBatch at a time. This is much more memory efficient. with open(path) as f: - batches: List[pa.RecordBatch] = [] for line in f: items.append(json.loads(line)) if len(items) >= chunk_size: - batches.append(_stac_items_to_arrow(items, schema=schema)) + batch = stac_items_to_arrow(items, schema=schema) + yield from _process_arrow_table( + pa.Table.from_batches([batch]), downcast=False + ).to_batches() items = [] # Don't forget the last chunk in case the total number of items is not a multiple of # chunk_size. if len(items) > 0: - batches.append(_stac_items_to_arrow(items, schema=schema)) - - table = pa.Table.from_batches(batches, schema=schema) - return _process_arrow_table(table, downcast=downcast) + batch = stac_items_to_arrow(items, schema=schema) + yield from _process_arrow_table( + pa.Table.from_batches([batch]), downcast=False + ).to_batches() def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: @@ -112,47 +131,6 @@ def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: return table -def _stac_items_to_arrow( - items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None -) -> pa.RecordBatch: - """Convert dicts representing STAC Items to Arrow - - This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple - geometry types. - - All items will be parsed into a single RecordBatch, meaning that each internal array - is fully contiguous in memory for the length of `items`. - - Args: - items: STAC Items to convert to Arrow - - Kwargs: - schema: An optional schema that describes the format of the data. Note that this - must represent the geometry column as binary type. - - Returns: - Arrow RecordBatch with items in Arrow - """ - # Preprocess GeoJSON to WKB in each STAC item - # Otherwise, pyarrow will try to parse coordinates into a native geometry type and - # if you have multiple geometry types pyarrow will error with - # `ArrowInvalid: cannot mix list and non-list, non-null values` - wkb_items = [] - for item in items: - wkb_item = deepcopy(item) - # Note: this mutates the existing items. Should we - wkb_item["geometry"] = shapely.to_wkb( - shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" - ) - wkb_items.append(wkb_item) - - if schema is not None: - array = pa.array(wkb_items, type=pa.struct(schema)) - else: - array = pa.array(wkb_items) - return pa.RecordBatch.from_struct_array(array) - - def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: """Bring all the fields inside of the nested "properties" struct to the top level""" properties_field = table.schema.field("properties") diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 3641001..16a6d99 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -1,13 +1,48 @@ import json -from typing import Any +from pathlib import Path +from typing import Any, Iterable, Optional, Union import pyarrow as pa import pyarrow.parquet as pq from pyproj import CRS +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.arrow._to_arrow import parse_stac_ndjson_to_arrow + WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict() +def parse_stac_ndjson_to_parquet( + input_path: Union[Union[str, Path], Iterable[Union[str, Path]]], + output_path: Union[str, Path], + *, + chunk_size: int = 65536, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, + **kwargs, +): + """Convert one or more newline-delimited JSON STAC files to GeoParquet + + Args: + input_path: One or more paths to files with STAC items. + output_path: A path to the output Parquet file. + chunk_size: The chunk size. Defaults to 65536. + schema: The schema to represent the input STAC data. Defaults to None, in which + case the schema will be inferred via a full pass over the input data. In + this case, there will be two full passes over the input data: one to infer a + common schema across all data and another to read the data and iteratively + convert to GeoParquet. + """ + batches_iter = parse_stac_ndjson_to_arrow( + input_path, chunk_size=chunk_size, schema=schema + ) + first_batch = next(batches_iter) + schema = first_batch.schema.with_metadata(_create_geoparquet_metadata()) + with pq.ParquetWriter(output_path, schema, **kwargs) as writer: + writer.write_batch(first_batch) + for batch in batches_iter: + writer.write_batch(batch) + + def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: """Write an Arrow table with STAC data to GeoParquet @@ -17,6 +52,14 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: table: The table to write to Parquet where: The destination for saving. """ + metadata = table.schema.metadata or {} + metadata.update(_create_geoparquet_metadata()) + table = table.replace_schema_metadata(metadata) + + pq.write_table(table, where, **kwargs) + + +def _create_geoparquet_metadata(): # TODO: include bbox of geometries column_meta = { "encoding": "WKB", @@ -38,9 +81,4 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: "columns": {"geometry": column_meta}, "primary_column": "geometry", } - - metadata = table.schema.metadata or {} - metadata.update({b"geo": json.dumps(geo_meta).encode("utf-8")}) - table = table.replace_schema_metadata(metadata) - - pq.write_table(table, where, **kwargs) + return {b"geo": json.dumps(geo_meta).encode("utf-8")} diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py new file mode 100644 index 0000000..25d948d --- /dev/null +++ b/stac_geoparquet/arrow/_util.py @@ -0,0 +1,62 @@ +from copy import deepcopy +from typing import Any, Dict, Optional, Sequence + +import pyarrow as pa +import shapely +import shapely.geometry + + +def stac_items_to_arrow( + items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None +) -> pa.RecordBatch: + """Convert dicts representing STAC Items to Arrow + + This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple + geometry types. + + All items will be parsed into a single RecordBatch, meaning that each internal array + is fully contiguous in memory for the length of `items`. + + Args: + items: STAC Items to convert to Arrow + + Kwargs: + schema: An optional schema that describes the format of the data. Note that this + must represent the geometry column as binary type. + + Returns: + Arrow RecordBatch with items in Arrow + """ + # Preprocess GeoJSON to WKB in each STAC item + # Otherwise, pyarrow will try to parse coordinates into a native geometry type and + # if you have multiple geometry types pyarrow will error with + # `ArrowInvalid: cannot mix list and non-list, non-null values` + wkb_items = [] + for item in items: + wkb_item = deepcopy(item) + wkb_item["geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" + ) + + # If a proj:geometry key exists in top-level properties, convert that to WKB + if "proj:geometry" in wkb_item["properties"]: + wkb_item["proj:geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["proj:geometry"]), flavor="iso" + ) + + # If a proj:geometry key exists in any asset properties, convert that to WKB + for asset_value in wkb_item["assets"].values(): + if "proj:geometry" in asset_value: + asset_value["proj:geometry"] = shapely.to_wkb( + shapely.geometry.shape(asset_value["proj:geometry"]), + flavor="iso", + ) + + wkb_items.append(wkb_item) + + if schema is not None: + array = pa.array(wkb_items, type=pa.struct(schema)) + else: + array = pa.array(wkb_items) + + return pa.RecordBatch.from_struct_array(array) From 42e84bdc3b7bef49bea0e984af9ee64850f61f21 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 9 May 2024 17:30:39 -0400 Subject: [PATCH 2/7] Update docstrings --- stac_geoparquet/arrow/_schema/models.py | 14 +++++++++++--- stac_geoparquet/arrow/_to_arrow.py | 20 +++++++++++++++++++- stac_geoparquet/arrow/_to_parquet.py | 8 ++++---- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py index 579f04c..51c6d7b 100644 --- a/stac_geoparquet/arrow/_schema/models.py +++ b/stac_geoparquet/arrow/_schema/models.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Any, Dict, Iterable, List, Union +from typing import Any, Dict, Iterable, Sequence, Union import pyarrow as pa @@ -29,8 +29,15 @@ def update_from_ndjson( self, path: Union[Union[str, Path], Iterable[Union[str, Path]]], *, - chunk_size: int = 10000, + chunk_size: int = 65536, ): + """ + Update this inferred schema from one or more newline-delimited JSON STAC files. + + Args: + path: One or more paths to files with STAC items. + chunk_size: The chunk size to load into memory at a time. Defaults to 65536. + """ # Handle multi-path input if not isinstance(path, (str, Path)): for p in path: @@ -53,7 +60,8 @@ def update_from_ndjson( if len(items) > 0: self.update_from_items(items) - def update_from_items(self, items: List[Dict[str, Any]]): + def update_from_items(self, items: Sequence[Dict[str, Any]]): + """Update this inferred schema from a sequence of STAC Items.""" self.count += len(items) current_schema = stac_items_to_arrow(items, schema=None).schema new_schema = pa.unify_schemas( diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 7075452..d25ad80 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -74,9 +74,27 @@ def parse_stac_items_to_arrow( def parse_stac_ndjson_to_arrow( path: Union[Union[str, Path], Iterable[Union[str, Path]]], *, - chunk_size: int = 8192, + chunk_size: int = 65536, schema: Optional[Union[pa.Schema, InferredSchema]] = None, ): + """ + Convert one or more newline-delimited JSON STAC files to a generator of Arrow + RecordBatches. + + Each RecordBatch in the returned iterator is guaranteed to have an identical schema, + and can be used to write to one or more Parquet files. + + Args: + path: One or more paths to files with STAC items. + chunk_size: The chunk size. Defaults to 65536. + schema: The schema to represent the input STAC data. Defaults to None, in which + case the schema will first be inferred via a full pass over the input data. + In this case, there will be two full passes over the input data: one to + infer a common schema across all data and another to read the data. + + Yields: + Arrow RecordBatch with a single chunk of Item data. + """ # Define outside of if/else to make mypy happy items: List[dict] = [] diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 16a6d99..1cf5beb 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -27,10 +27,10 @@ def parse_stac_ndjson_to_parquet( output_path: A path to the output Parquet file. chunk_size: The chunk size. Defaults to 65536. schema: The schema to represent the input STAC data. Defaults to None, in which - case the schema will be inferred via a full pass over the input data. In - this case, there will be two full passes over the input data: one to infer a - common schema across all data and another to read the data and iteratively - convert to GeoParquet. + case the schema will first be inferred via a full pass over the input data. + In this case, there will be two full passes over the input data: one to + infer a common schema across all data and another to read the data and + iteratively convert to GeoParquet. """ batches_iter = parse_stac_ndjson_to_arrow( input_path, chunk_size=chunk_size, schema=schema From 520cb332fd892d95614f9eb2a8008993a96e0ef2 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 9 May 2024 18:18:23 -0400 Subject: [PATCH 3/7] fix dict accessor --- stac_geoparquet/arrow/_to_arrow.py | 3 ++- stac_geoparquet/arrow/_util.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index d25ad80..3686e76 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -103,9 +103,10 @@ def parse_stac_ndjson_to_arrow( if schema is None: inferred_schema = InferredSchema() inferred_schema.update_from_ndjson(path, chunk_size=chunk_size) - return parse_stac_ndjson_to_arrow( + yield from parse_stac_ndjson_to_arrow( path, chunk_size=chunk_size, schema=inferred_schema ) + return # Check if path is an iterable # If so, recursively call this function on each item in the iterable diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py index 25d948d..c02a294 100644 --- a/stac_geoparquet/arrow/_util.py +++ b/stac_geoparquet/arrow/_util.py @@ -41,7 +41,8 @@ def stac_items_to_arrow( # If a proj:geometry key exists in top-level properties, convert that to WKB if "proj:geometry" in wkb_item["properties"]: wkb_item["proj:geometry"] = shapely.to_wkb( - shapely.geometry.shape(wkb_item["proj:geometry"]), flavor="iso" + shapely.geometry.shape(wkb_item["properties"]["proj:geometry"]), + flavor="iso", ) # If a proj:geometry key exists in any asset properties, convert that to WKB From 2b445e78c4297ad0ef5ff0bbbbe7535ceb095e20 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 9 May 2024 18:21:23 -0400 Subject: [PATCH 4/7] fix setter --- stac_geoparquet/arrow/_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py index c02a294..8714064 100644 --- a/stac_geoparquet/arrow/_util.py +++ b/stac_geoparquet/arrow/_util.py @@ -40,7 +40,7 @@ def stac_items_to_arrow( # If a proj:geometry key exists in top-level properties, convert that to WKB if "proj:geometry" in wkb_item["properties"]: - wkb_item["proj:geometry"] = shapely.to_wkb( + wkb_item["properties"]["proj:geometry"] = shapely.to_wkb( shapely.geometry.shape(wkb_item["properties"]["proj:geometry"]), flavor="iso", ) From 8bf171ce445501673beefbeccadc6d75a1209cb9 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 10 May 2024 14:44:51 -0400 Subject: [PATCH 5/7] Fix Arrow -> STAC conversion for nested proj:geometry --- stac_geoparquet/arrow/_from_arrow.py | 107 ++++++++++++++++++++++++--- 1 file changed, 95 insertions(+), 12 deletions(-) diff --git a/stac_geoparquet/arrow/_from_arrow.py b/stac_geoparquet/arrow/_from_arrow.py index f940864..e4cc329 100644 --- a/stac_geoparquet/arrow/_from_arrow.py +++ b/stac_geoparquet/arrow/_from_arrow.py @@ -1,13 +1,17 @@ """Convert STAC Items in Arrow Table format to JSON Lines or Python dicts.""" -import os import json -from typing import Iterable, List, Union +import operator +import os +from functools import reduce +from typing import Iterable, List, Sequence, Union import numpy as np import pyarrow as pa import pyarrow.compute as pc import shapely +from numpy.typing import NDArray +import shapely.geometry def stac_table_to_ndjson(table: pa.Table, dest: Union[str, os.PathLike[str]]) -> None: @@ -22,20 +26,46 @@ def stac_table_to_items(table: pa.Table) -> Iterable[dict]: """Convert a STAC Table to a generator of STAC Item `dict`s""" table = _undo_stac_table_transformations(table) - # Convert WKB geometry column to GeoJSON, and then assign the geojson geometry when - # converting each row to a dictionary. - for batch in table.to_batches(): - geoms = shapely.from_wkb(batch["geometry"]) - geojson_strings = shapely.to_geojson(geoms) + # Find all paths in the schema that have a WKB geometry + geometry_paths = [["geometry"]] + try: + table.schema.field("properties").type.field("proj:geometry") + geometry_paths.append(["properties", "proj:geometry"]) + except KeyError: + pass - # RecordBatch is missing a `drop()` method, so we keep all columns other than - # geometry instead - keep_column_names = [name for name in batch.column_names if name != "geometry"] - struct_batch = batch.select(keep_column_names).to_struct_array() + assets_struct = table.schema.field("assets").type + for asset_idx in range(assets_struct.num_fields): + asset_field = assets_struct.field(asset_idx) + if "proj:geometry" in pa.schema(asset_field).names: + geometry_paths.append(["assets", asset_field.name, "proj:geometry"]) + for batch in table.to_batches(): + # Convert each geometry column to a Shapely geometry, and then assign the + # geojson geometry when converting each row to a dictionary. + geometries: List[NDArray[np.object_]] = [] + for geometry_path in geometry_paths: + col = batch + for path_segment in geometry_path: + if isinstance(col, pa.RecordBatch): + col = col[path_segment] + elif pa.types.is_struct(col.type): + col = pc.struct_field(col, path_segment) + else: + raise AssertionError(f"unexpected type {type(col)}") + + geometries.append(shapely.from_wkb(col)) + + struct_batch = batch.to_struct_array() for row_idx in range(len(struct_batch)): row_dict = struct_batch[row_idx].as_py() - row_dict["geometry"] = json.loads(geojson_strings[row_idx]) + for geometry_path, geometry_column in zip(geometry_paths, geometries): + geojson_g = geometry_column[row_idx].__geo_interface__ + geojson_g["coordinates"] = convert_tuples_to_lists( + geojson_g["coordinates"] + ) + set_by_path(row_dict, geometry_path, geojson_g) + yield row_dict @@ -164,3 +194,56 @@ def _convert_bbox_to_array(table: pa.Table) -> pa.Table: new_chunks.append(list_arr) return table.set_column(bbox_col_idx, "bbox", new_chunks) + + +def convert_tuples_to_lists(t: Sequence): + """Convert tuples to lists, recursively + + For example, converts: + ``` + ( + ( + (-112.4820566, 38.1261015), + (-112.4816283, 38.1331311), + (-112.4833551, 38.1338897), + (-112.4832919, 38.1307687), + (-112.4855415, 38.1291793), + (-112.4820566, 38.1261015), + ), + ) + ``` + + to + + ```py + [ + [ + [-112.4820566, 38.1261015], + [-112.4816283, 38.1331311], + [-112.4833551, 38.1338897], + [-112.4832919, 38.1307687], + [-112.4855415, 38.1291793], + [-112.4820566, 38.1261015], + ] + ] + ``` + + From https://stackoverflow.com/a/1014669. + """ + return list(map(convert_tuples_to_lists, t)) if isinstance(t, (list, tuple)) else t + + +def get_by_path(root, items): + """Access a nested object in root by item sequence. + + From https://stackoverflow.com/a/14692747 + """ + return reduce(operator.getitem, items, root) + + +def set_by_path(root, items, value): + """Set a value in a nested object in root by item sequence. + + From https://stackoverflow.com/a/14692747 + """ + get_by_path(root, items[:-1])[items[-1]] = value # type: ignore From 14eb065c546477df581734eb60f8d5a3ff1bb919 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 10 May 2024 14:58:36 -0400 Subject: [PATCH 6/7] fix types --- stac_geoparquet/arrow/_from_arrow.py | 12 ++++++------ stac_geoparquet/arrow/_schema/models.py | 4 ++-- stac_geoparquet/arrow/_to_arrow.py | 14 ++++++++++++-- stac_geoparquet/arrow/_to_parquet.py | 6 +++--- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/stac_geoparquet/arrow/_from_arrow.py b/stac_geoparquet/arrow/_from_arrow.py index e4cc329..ec717ae 100644 --- a/stac_geoparquet/arrow/_from_arrow.py +++ b/stac_geoparquet/arrow/_from_arrow.py @@ -4,7 +4,7 @@ import operator import os from functools import reduce -from typing import Iterable, List, Sequence, Union +from typing import Any, Dict, Iterable, List, Sequence, Tuple, Union import numpy as np import pyarrow as pa @@ -196,7 +196,7 @@ def _convert_bbox_to_array(table: pa.Table) -> pa.Table: return table.set_column(bbox_col_idx, "bbox", new_chunks) -def convert_tuples_to_lists(t: Sequence): +def convert_tuples_to_lists(t: List | Tuple) -> List[Any]: """Convert tuples to lists, recursively For example, converts: @@ -233,17 +233,17 @@ def convert_tuples_to_lists(t: Sequence): return list(map(convert_tuples_to_lists, t)) if isinstance(t, (list, tuple)) else t -def get_by_path(root, items): +def get_by_path(root: Dict[str, Any], keys: Sequence[str]) -> Any: """Access a nested object in root by item sequence. From https://stackoverflow.com/a/14692747 """ - return reduce(operator.getitem, items, root) + return reduce(operator.getitem, keys, root) -def set_by_path(root, items, value): +def set_by_path(root: Dict[str, Any], keys: Sequence[str], value: Any) -> None: """Set a value in a nested object in root by item sequence. From https://stackoverflow.com/a/14692747 """ - get_by_path(root, items[:-1])[items[-1]] = value # type: ignore + get_by_path(root, keys[:-1])[keys[-1]] = value # type: ignore diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py index 51c6d7b..c3c0ecb 100644 --- a/stac_geoparquet/arrow/_schema/models.py +++ b/stac_geoparquet/arrow/_schema/models.py @@ -30,7 +30,7 @@ def update_from_ndjson( path: Union[Union[str, Path], Iterable[Union[str, Path]]], *, chunk_size: int = 65536, - ): + ) -> None: """ Update this inferred schema from one or more newline-delimited JSON STAC files. @@ -60,7 +60,7 @@ def update_from_ndjson( if len(items) > 0: self.update_from_items(items) - def update_from_items(self, items: Sequence[Dict[str, Any]]): + def update_from_items(self, items: Sequence[Dict[str, Any]]) -> None: """Update this inferred schema from a sequence of STAC Items.""" self.count += len(items) current_schema = stac_items_to_arrow(items, schema=None).schema diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 3686e76..ee1deec 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -3,7 +3,17 @@ import json from datetime import datetime from pathlib import Path -from typing import Any, Dict, Generator, Iterable, List, Optional, Sequence, Union +from typing import ( + Any, + Dict, + Generator, + Iterable, + Iterator, + List, + Optional, + Sequence, + Union, +) import ciso8601 import numpy as np @@ -76,7 +86,7 @@ def parse_stac_ndjson_to_arrow( *, chunk_size: int = 65536, schema: Optional[Union[pa.Schema, InferredSchema]] = None, -): +) -> Iterator[pa.RecordBatch]: """ Convert one or more newline-delimited JSON STAC files to a generator of Arrow RecordBatches. diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 1cf5beb..8acf820 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -18,8 +18,8 @@ def parse_stac_ndjson_to_parquet( *, chunk_size: int = 65536, schema: Optional[Union[pa.Schema, InferredSchema]] = None, - **kwargs, -): + **kwargs: Any, +) -> None: """Convert one or more newline-delimited JSON STAC files to GeoParquet Args: @@ -59,7 +59,7 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: pq.write_table(table, where, **kwargs) -def _create_geoparquet_metadata(): +def _create_geoparquet_metadata() -> dict[bytes, bytes]: # TODO: include bbox of geometries column_meta = { "encoding": "WKB", From ad6121ab57af6b607f327ba6111df57f106e64af Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 15 May 2024 12:26:41 -0400 Subject: [PATCH 7/7] fix circular import --- stac_geoparquet/arrow/_crs.py | 3 +++ stac_geoparquet/arrow/_to_arrow.py | 2 +- stac_geoparquet/arrow/_to_parquet.py | 4 +--- 3 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 stac_geoparquet/arrow/_crs.py diff --git a/stac_geoparquet/arrow/_crs.py b/stac_geoparquet/arrow/_crs.py new file mode 100644 index 0000000..2c72ffd --- /dev/null +++ b/stac_geoparquet/arrow/_crs.py @@ -0,0 +1,3 @@ +from pyproj import CRS + +WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict() diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index d09ce90..0918d34 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -23,7 +23,7 @@ import shapely.geometry from stac_geoparquet.arrow._schema.models import InferredSchema -from stac_geoparquet.arrow._to_parquet import WGS84_CRS_JSON +from stac_geoparquet.arrow._crs import WGS84_CRS_JSON from stac_geoparquet.arrow._util import stac_items_to_arrow diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 8acf820..94da692 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -4,12 +4,10 @@ import pyarrow as pa import pyarrow.parquet as pq -from pyproj import CRS from stac_geoparquet.arrow._schema.models import InferredSchema from stac_geoparquet.arrow._to_arrow import parse_stac_ndjson_to_arrow - -WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict() +from stac_geoparquet.arrow._crs import WGS84_CRS_JSON def parse_stac_ndjson_to_parquet(