-
Couldn't load subscription status.
- Fork 15
Write to Delta Lake #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
54d625c
Move json equality logic outside of test_arrow
kylebarron a0433c5
Refactor to RawBatch and CleanBatch wrapper types
kylebarron 7fabc9a
Move _from_arrow functions to _api
kylebarron 46295d0
Update imports
kylebarron fa226d4
fix circular import
kylebarron cc7beec
keep deprecated api
kylebarron 6060644
Add write-read test and fix typing
kylebarron 4c5d08b
add parquet tests
kylebarron 14a6bc9
fix ci
kylebarron c4d712b
Initial delta lake support
kylebarron 0ccceca
Manual schema updates
kylebarron 612a8eb
Add delta lake dep
kylebarron 4907d4c
Merge branch 'main' into kyle/delta-lake-interop
kylebarron baae396
Fix export
kylebarron 66eec94
fix pyupgrade lint
kylebarron c724bc5
Add type hints
kylebarron 6a9704b
any typing
kylebarron File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import itertools | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING, Any, Iterable | ||
|
|
||
| import pyarrow as pa | ||
| from deltalake import write_deltalake | ||
|
|
||
| from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow | ||
| from stac_geoparquet.arrow._to_parquet import create_geoparquet_metadata | ||
|
|
||
| if TYPE_CHECKING: | ||
| from deltalake import DeltaTable | ||
|
|
||
|
|
||
| def parse_stac_ndjson_to_delta_lake( | ||
| input_path: str | Path | Iterable[str | Path], | ||
| table_or_uri: str | Path | DeltaTable, | ||
| *, | ||
| chunk_size: int = 65536, | ||
| schema: pa.Schema | None = None, | ||
| limit: int | None = None, | ||
| **kwargs: Any, | ||
| ) -> None: | ||
| batches_iter = parse_stac_ndjson_to_arrow( | ||
| input_path, chunk_size=chunk_size, schema=schema, limit=limit | ||
| ) | ||
| first_batch = next(batches_iter) | ||
| schema = first_batch.schema.with_metadata( | ||
| create_geoparquet_metadata(pa.Table.from_batches([first_batch])) | ||
| ) | ||
| combined_iter = itertools.chain([first_batch], batches_iter) | ||
| write_deltalake(table_or_uri, combined_iter, schema=schema, engine="rust", **kwargs) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| import json | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
| from deltalake import DeltaTable | ||
|
|
||
| from stac_geoparquet.arrow import stac_table_to_items | ||
| from stac_geoparquet.arrow._delta_lake import parse_stac_ndjson_to_delta_lake | ||
|
|
||
| from .json_equals import assert_json_value_equal | ||
|
|
||
| HERE = Path(__file__).parent | ||
|
|
||
| TEST_COLLECTIONS = [ | ||
| "3dep-lidar-copc", | ||
| # "3dep-lidar-dsm", | ||
| "cop-dem-glo-30", | ||
| "io-lulc-annual-v02", | ||
| # "io-lulc", | ||
| "landsat-c2-l1", | ||
| "landsat-c2-l2", | ||
| "naip", | ||
| "planet-nicfi-analytic", | ||
| "sentinel-1-rtc", | ||
| "sentinel-2-l2a", | ||
| "us-census", | ||
| ] | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("collection_id", TEST_COLLECTIONS) | ||
| def test_round_trip_via_delta_lake(collection_id: str, tmp_path: Path): | ||
| path = HERE / "data" / f"{collection_id}-pc.json" | ||
| out_path = tmp_path / collection_id | ||
| parse_stac_ndjson_to_delta_lake(path, out_path) | ||
|
|
||
| # Read back into table and convert to json | ||
| dt = DeltaTable(out_path) | ||
| table = dt.to_pyarrow_table() | ||
| items_result = list(stac_table_to_items(table)) | ||
|
|
||
| # Compare with original json | ||
| with open(HERE / "data" / f"{collection_id}-pc.json") as f: | ||
| items = json.load(f) | ||
|
|
||
| for result, expected in zip(items_result, items): | ||
| assert_json_value_equal(result, expected, precision=0) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make a generic arrow_batches_to_delta_lake that takes in batches and then just make the parse_stac_ndjson_to_delta_lake a wrapper around that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, that generic
arrow_batches_to_delta_lakefunction is literally justwrite_deltalake. You can pass anIterable[pa.RecordBatch]directly to it. (You just also need to know the schema separately. Are you suggesting a helper that takes the first batch and passes its schema towrite_deltalake?)