|
2 | 2 |
|
3 | 3 | from __future__ import annotations |
4 | 4 |
|
| 5 | +import io |
5 | 6 | import random |
6 | 7 | from pathlib import Path |
7 | 8 |
|
| 9 | +import nested_pandas as npd |
8 | 10 | import numpy as np |
9 | 11 | import pandas as pd |
10 | 12 | import pyarrow as pa |
11 | | -import pyarrow.dataset as pds |
12 | 13 | import pyarrow.parquet as pq |
| 14 | +from astropy.io.votable.tree import FieldRef, Group, Param, VOTableFile |
| 15 | +from astropy.table import Table |
13 | 16 | from upath import UPath |
14 | 17 |
|
15 | 18 | from hats.io import file_io, paths |
@@ -165,29 +168,6 @@ def write_parquet_metadata( |
165 | 168 | return total_rows |
166 | 169 |
|
167 | 170 |
|
168 | | -def read_row_group_fragments(metadata_file: str): |
169 | | - """Generator for metadata fragment row groups in a parquet metadata file. |
170 | | -
|
171 | | - Parameters |
172 | | - ---------- |
173 | | - metadata_file : str |
174 | | - path to `_metadata` file. |
175 | | -
|
176 | | - Yields |
177 | | - ------ |
178 | | - RowGroupFragment |
179 | | - metadata for individual row groups |
180 | | - """ |
181 | | - metadata_file = get_upath(metadata_file) |
182 | | - if not file_io.is_regular_file(metadata_file): |
183 | | - metadata_file = paths.get_parquet_metadata_pointer(metadata_file) |
184 | | - |
185 | | - dataset = pds.parquet_dataset(metadata_file.path, filesystem=metadata_file.fs) |
186 | | - |
187 | | - for frag in dataset.get_fragments(): |
188 | | - yield from frag.row_groups |
189 | | - |
190 | | - |
191 | 171 | def _nonemin(value1, value2): |
192 | 172 | """Similar to numpy's nanmin, but excludes `None` values. |
193 | 173 |
|
@@ -529,3 +509,196 @@ def per_pixel_statistics( |
529 | 509 | {stat_name: int for stat_name in int_col_names} |
530 | 510 | ) |
531 | 511 | return frame |
| 512 | + |
| 513 | + |
| 514 | +def pick_metadata_schema_file(catalog_base_dir: str | Path | UPath) -> UPath | None: |
| 515 | + """Determines the appropriate file to read for parquet metadata |
| 516 | + stored in the _common_metadata or _metadata files. |
| 517 | +
|
| 518 | + Parameters |
| 519 | + ---------- |
| 520 | + catalog_base_dir : str | Path | UPath |
| 521 | + base path for the catalog |
| 522 | +
|
| 523 | + Returns |
| 524 | + ------- |
| 525 | + UPath | None |
| 526 | + path to a parquet file containing metadata schema. |
| 527 | + """ |
| 528 | + common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir) |
| 529 | + common_metadata_exists = file_io.does_file_or_directory_exist(common_metadata_file) |
| 530 | + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) |
| 531 | + metadata_exists = file_io.does_file_or_directory_exist(metadata_file) |
| 532 | + if not (common_metadata_exists or metadata_exists): |
| 533 | + return None |
| 534 | + return common_metadata_file if common_metadata_exists else metadata_file |
| 535 | + |
| 536 | + |
| 537 | +# pylint: disable=protected-access |
| 538 | +def nested_frame_to_vo_schema( |
| 539 | + nested_frame: npd.NestedFrame, |
| 540 | + *, |
| 541 | + verbose: bool = False, |
| 542 | + field_units: dict | None = None, |
| 543 | + field_ucds: dict | None = None, |
| 544 | + field_descriptions: dict | None = None, |
| 545 | + field_utypes: dict | None = None, |
| 546 | +): |
| 547 | + """Create VOTableFile metadata, based on the names and types of fields in the NestedFrame. |
| 548 | + Add ancillary attributes to fields where they are provided in the optional dictionaries. |
| 549 | + Note on field names with nested columns: to include ancillary attributes (units, ucds, etc) |
| 550 | + for a nested sub-column, use dot notation (e.g. ``"lightcurve.band"``). You can add ancillary |
| 551 | + attributes for the entire nested column group using the nested column name (e.g. ``"lightcurve"``). |
| 552 | +
|
| 553 | + Parameters |
| 554 | + ---------- |
| 555 | + nested_frame : npd.NestedFrame |
| 556 | + nested frame representing catalog data. this can be empty, as we only need to |
| 557 | + know about the column names and types. |
| 558 | + verbose: bool |
| 559 | + Should we print out additional debugging statements about the vo metadata? |
| 560 | + field_units: dict | None |
| 561 | + dictionary mapping column names to astropy units (or string representation of units) |
| 562 | + field_ucds: dict | None |
| 563 | + dictionary mapping column names to UCDs (Uniform Content Descriptors) |
| 564 | + field_descriptions: dict | None |
| 565 | + dictionary mapping column names to free-text descriptions |
| 566 | + field_utypes: dict | None |
| 567 | + dictionary mapping column names to utypes |
| 568 | +
|
| 569 | + Returns |
| 570 | + ------- |
| 571 | + VOTableFile |
| 572 | + VO object containing all relevant metadata (but no data) |
| 573 | + """ |
| 574 | + field_units = field_units or {} |
| 575 | + field_ucds = field_ucds or {} |
| 576 | + field_descriptions = field_descriptions or {} |
| 577 | + field_utypes = field_utypes or {} |
| 578 | + |
| 579 | + # Collate and tidy up the column names and data types. |
| 580 | + df_types = nested_frame.to_pandas().dtypes |
| 581 | + names = [] |
| 582 | + data_types = [] |
| 583 | + for col in nested_frame.base_columns: |
| 584 | + names.append(col) |
| 585 | + data_types.append(str(df_types[col])) |
| 586 | + |
| 587 | + for col in nested_frame.nested_columns: |
| 588 | + for key, val in nested_frame[col].dtype.column_dtypes.items(): |
| 589 | + names.append(f"{col}.{key}") |
| 590 | + data_types.append(str(val)) |
| 591 | + # astropy.Table uses numpy-style dtypes, and this cleans up type strings. |
| 592 | + data_types = ["U" if "string" in t else t.removesuffix("[pyarrow]") for t in data_types] |
| 593 | + |
| 594 | + # Might have extra content for nested columns. |
| 595 | + named_descriptions = {key: field_descriptions[key] for key in field_descriptions if key in names} |
| 596 | + named_units = {key: field_units[key] for key in field_units if key in names} |
| 597 | + if verbose: |
| 598 | + dropped_keys_units = set(field_units.keys()) - set(named_units.keys()) |
| 599 | + dropped_keys_desc = set(field_descriptions.keys()) - set(named_descriptions.keys()) |
| 600 | + if dropped_keys_units or dropped_keys_desc: |
| 601 | + print("================== Extra Fields ==================") |
| 602 | + if dropped_keys_units: |
| 603 | + print(f"warning - dropping some units ({len(dropped_keys_units)}):") |
| 604 | + print(dropped_keys_units) |
| 605 | + if dropped_keys_desc: |
| 606 | + print(f"warning - dropping some descriptions ({len(dropped_keys_desc)}):") |
| 607 | + print(dropped_keys_desc) |
| 608 | + |
| 609 | + t = Table(names=names, dtype=data_types, units=named_units, descriptions=named_descriptions) |
| 610 | + |
| 611 | + votablefile = VOTableFile() |
| 612 | + votablefile = votablefile.from_table(t) |
| 613 | + |
| 614 | + ## TODO - add info to root resource, e.g. obsregime. |
| 615 | + |
| 616 | + ## Add groups for nested columns |
| 617 | + vo_table = votablefile.get_first_table() |
| 618 | + for col in nested_frame.nested_columns: |
| 619 | + new_group = Group(vo_table, name=col, config=vo_table._config, pos=vo_table._pos) |
| 620 | + if col in field_descriptions: |
| 621 | + new_group.description = field_descriptions[col] |
| 622 | + else: |
| 623 | + new_group.description = "multi-column nested format" |
| 624 | + vo_table.groups.append(new_group) |
| 625 | + |
| 626 | + new_param = Param(vo_table, name="is_nested_column", datatype="boolean", value="t") |
| 627 | + new_group.entries.append(new_param) |
| 628 | + |
| 629 | + for key in nested_frame[col].columns: |
| 630 | + new_field = FieldRef(vo_table, ref=f"{col}.{key}") |
| 631 | + new_group.entries.append(new_field) |
| 632 | + |
| 633 | + ## Go back and add UCD/utypes to fields |
| 634 | + for field in vo_table.iter_fields_and_params(): |
| 635 | + field_name = field.name |
| 636 | + if field_name in field_ucds: |
| 637 | + field.ucd = field_ucds[field_name] |
| 638 | + if field_name in field_utypes: |
| 639 | + field.utype = field_utypes[field_name] |
| 640 | + return votablefile |
| 641 | + |
| 642 | + |
| 643 | +def write_voparquet_in_common_metadata( |
| 644 | + catalog_base_dir: str | Path | UPath, |
| 645 | + *, |
| 646 | + verbose: bool = False, |
| 647 | + field_units: dict | None = None, |
| 648 | + field_ucds: dict | None = None, |
| 649 | + field_descriptions: dict | None = None, |
| 650 | + field_utypes: dict | None = None, |
| 651 | +): |
| 652 | + """Create VOTableFile metadata, based on the names and types of fields in the parquet files, |
| 653 | + and write to a ``catalog_base_dir/dataset/_common_metadata`` parquet file. |
| 654 | + Add ancillary attributes to fields where they are provided in the optional dictionaries. |
| 655 | + Note on field names with nested columns: to include ancillary attributes (units, ucds, etc) |
| 656 | + for a nested sub-column, use dot notation (e.g. ``"lightcurve.band"``). You can add ancillary |
| 657 | + attributes for the entire nested column group using the nested column name (e.g. ``"lightcurve"``). |
| 658 | +
|
| 659 | + Parameters |
| 660 | + ---------- |
| 661 | + catalog_base_dir : str | Path | UPath |
| 662 | + base path for the catalog |
| 663 | + verbose: bool |
| 664 | + Should we print out additional debugging statements about the vo metadata? |
| 665 | + field_units: dict | None |
| 666 | + dictionary mapping column names to astropy units (or string representation of units) |
| 667 | + field_ucds: dict | None |
| 668 | + dictionary mapping column names to UCDs (Uniform Content Descriptors) |
| 669 | + field_descriptions: dict | None |
| 670 | + dictionary mapping column names to free-text descriptions |
| 671 | + field_utypes: dict | None |
| 672 | + dictionary mapping column names to utypes |
| 673 | + """ |
| 674 | + schema_file = pick_metadata_schema_file(catalog_base_dir=catalog_base_dir) |
| 675 | + if not schema_file: |
| 676 | + return |
| 677 | + nested_frame = npd.read_parquet(schema_file) |
| 678 | + votablefile = nested_frame_to_vo_schema( |
| 679 | + nested_frame=nested_frame, |
| 680 | + verbose=verbose, |
| 681 | + field_units=field_units, |
| 682 | + field_ucds=field_ucds, |
| 683 | + field_descriptions=field_descriptions, |
| 684 | + field_utypes=field_utypes, |
| 685 | + ) |
| 686 | + |
| 687 | + xml_bstr = io.BytesIO() |
| 688 | + votablefile.to_xml(xml_bstr) |
| 689 | + xml_str = xml_bstr.getvalue().decode("utf-8") |
| 690 | + if verbose: |
| 691 | + print("================== Table XML ==================") |
| 692 | + print(xml_str) |
| 693 | + |
| 694 | + pa_schema = file_io.read_parquet_metadata(schema_file).schema.to_arrow_schema() |
| 695 | + |
| 696 | + original_metadata = pa_schema.metadata or {} |
| 697 | + updated_metadata = original_metadata | { |
| 698 | + b"IVOA.VOTable-Parquet.version": b"1.0", |
| 699 | + b"IVOA.VOTable-Parquet.content": xml_str, |
| 700 | + } |
| 701 | + |
| 702 | + pa_schema = pa_schema.with_metadata(updated_metadata) |
| 703 | + |
| 704 | + file_io.write_parquet_metadata(pa_schema, schema_file) |
0 commit comments