diff --git a/conf.py b/conf.py index 93523856..fd3d570e 100644 --- a/conf.py +++ b/conf.py @@ -49,9 +49,13 @@ # Ignore them here. nb_execution_excludepatterns += ['wise-allwise-catalog-demo.md', 'Parallelize_Convolution.md'] +if 'CI' in os.environ: + # Both NEOWISE parquet notebooks work with large data that doesn't work within CircleCI or GHA resource limits + nb_execution_excludepatterns += ['neowise-source-table-strategies.md', 'neowise-source-table-lightcurves.md'] + if platform.platform().startswith("mac") or platform.platform().startswith("win"): # The way the notebooks use the multiprocessing module is known to not work on non-Linux - nb_execution_excludepatterns += ['Parallelize_Convolution.md'] + nb_execution_excludepatterns += ['Parallelize_Convolution.md', 'neowise-source-table-lightcurves.md'] # -- Options for HTML output ------------------------------------------------- diff --git a/ignore_gha_testing b/ignore_gha_testing new file mode 100644 index 00000000..0fcd2be0 --- /dev/null +++ b/ignore_gha_testing @@ -0,0 +1,2 @@ +tutorials/parquet-catalog-demos/neowise-source-table-lightcurves +tutorials/parquet-catalog-demos/neowise-source-table-strategies diff --git a/index.md b/index.md index a77dcc7d..8f5aa2ee 100644 --- a/index.md +++ b/index.md @@ -33,6 +33,8 @@ caption: Cloud data access tutorials/cloud_access/cloud-access-intro tutorials/parquet-catalog-demos/wise-allwise-catalog-demo +tutorials/parquet-catalog-demos/neowise-source-table-strategies +tutorials/parquet-catalog-demos/neowise-source-table-lightcurves tutorials/openuniversesims/openuniverse2024_roman_simulated_timedomainsurvey tutorials/openuniversesims/openuniverse2024_roman_simulated_wideareasurvey diff --git a/tox.ini b/tox.ini index 57069b2d..75982fff 100644 --- a/tox.ini +++ b/tox.ini @@ -37,6 +37,7 @@ commands = # too due to issues with e.g. multiprocessing and problems in upstream dependency !buildhtml: bash -c 'if python -c "import platform; print(platform.platform())" | grep -i macos; then cat ignore_osx_testing >> ignore_testing; fi' !buildhtml: bash -c 'if python -c "import platform; print(platform.platform())" | grep -i win; then cat ignore_windows_testing >> ignore_testing; fi' + !buildhtml: bash -c 'if [[ $CI == true ]]; then cat ignore_gha_testing >> ignore_testing; fi' !buildhtml: bash -c 'find tutorials -name "*.md" | grep -vf ignore_testing | xargs jupytext --to notebook ' !buildhtml: pytest --nbval-lax -vv --durations=10 tutorials diff --git a/tutorials/parquet-catalog-demos/neowise-source-table-lightcurves.md b/tutorials/parquet-catalog-demos/neowise-source-table-lightcurves.md new file mode 100644 index 00000000..5f459631 --- /dev/null +++ b/tutorials/parquet-catalog-demos/neowise-source-table-lightcurves.md @@ -0,0 +1,462 @@ +--- +jupytext: + text_representation: + extension: .md + format_name: myst + format_version: 0.13 + jupytext_version: 1.16.1 +kernelspec: + display_name: science_demo + language: python + name: python3 +--- + +# Make Light Curves from NEOWISE Single-exposure Source Table + ++++ + +Learning Goals: + +- Search the NEOWISE Single-exposure Source Table (Parquet version) for the light curves of a + set of targets with RA/Dec coordinates. + - Write a pyarrow dataset filter and use it to load the NEOWISE detections near each target (rough cut). + - Match targets to detections using an astropy cone search (precise cut). + - Parallelize this. +- Plot the light curves. + ++++ + +## 1. Introduction + +This notebook loads light curves from the +[NEOWISE](https://irsa.ipac.caltech.edu/Missions/wise.html) Single-exposure Source Table +for a sample of about 2000 cataclysmic variables from [Downes et al. (2001)](https://doi.org/10.1086/320802). +The NEOWISE Single-exposure Source Table is a very large catalog -- 10 years and 40 terabytes in total +with 145 columns and 200 billion rows. +When searching this catalog, it is important to consider the requirements of your use case and +the format of this dataset. +This notebook applies the techniques developed in +[Strategies to Efficiently Work with NEOWISE Single-exposure Source Table in Parquet](https://irsa.ipac.caltech.edu/docs/notebooks/neowise-source-table-strategies.html). +This is a fully-worked example that demonstrates the important steps, but note that this is a +relatively small use case for the Parquet version of the dataset. + +The specific strategy we employ is: + +- Choose a cone search radius that determines which NEOWISE source detections to associate + with each target. +- Load the sample of CV targets. +- Calculate the indexes of all HEALPix order k=5 pixels within the radius of each target. + These are the dataset partitions that need to be searched. +- Parallelize over the partitions using `multiprocessing.Pool`. + For each pixel: + - Construct a dataset filter for NEOWISE sources in the vicinity of the targets in the partition. + - Load data, applying the filter. In our case, the number of rows loaded will be fairly small. + - Do a cone search to match sources with targets in the partition. + - Return the results. +- Concatenate the cone search results, groupby target ID, and sort by time to construct the light curves. + +The efficiency of this method will increase with the number of rows needed from each partition. +For example, a cone search radius of 1 arcsec will require about 10 CPUs, 65G RAM, and +50 minutes to load the data from all 10 NEOWISE years. +Increasing the radius to 10 arcsec will return about 2.5x more rows using roughly the same resources. +Increasing the target sample size can result in similar efficiency gains. +To try out this notebook with fewer resources, use a subset of NEOWISE years. +Using one year is expected to require about 5 CPUs, 20G RAM, and 10 minutes. +These estimates are based on testing in science platform environments. +Your numbers will vary based on many factors including compute power, bandwidth, and physical distance from the data. + ++++ + +## 2. Imports + +```{code-cell} ipython3 +# Uncomment the next line to install dependencies if needed. +# !pip install astropy astroquery hpgeom matplotlib pandas pyarrow pyvo +``` + +```{code-cell} ipython3 +import multiprocessing # parallelization + +import astroquery.vizier # fetch the sample of CV targets +import hpgeom # HEALPix math +import numpy as np # math +import pandas as pd # manipulate tabular data +import pyarrow.compute # construct dataset filters +import pyarrow.dataset # load and query the NEOWISE dataset +import pyarrow.fs # interact with the S3 bucket storing the NEOWISE catalog +import pyvo # TAP service for the Vizier query +from astropy import units as u # manipulate astropy quantities +from astropy.coordinates import SkyCoord # manipulate sky coordinates +from matplotlib import pyplot as plt # plot light curves + +# copy-on-write will become the default in pandas 3.0 and is generally more performant +pd.options.mode.copy_on_write = True +``` + +## 3. Setup + ++++ + +### 3.1 Define variables + +First, choose which NEOWISE years to include. +Real use cases are likely to require all ten years but it can be helpful to start with +fewer while exploring to make things run faster. + +```{code-cell} ipython3 +YEARS = list(range(1, 11)) # all years => about 11 CPU, 65G RAM, and 50 minutes runtime + +# To try out a smaller version of the notebook, +# uncomment the next line and choose your own subset of years. +# YEARS = [10] # one year => about 5 CPU, 20G RAM, and 10 minutes runtime +``` + +```{code-cell} ipython3 +# sets of columns that we'll need +FLUX_COLUMNS = ["w1flux", "w2flux"] +LIGHTCURVE_COLUMNS = ["mjd"] + FLUX_COLUMNS +COLUMN_SUBSET = ["cntr", "ra", "dec"] + LIGHTCURVE_COLUMNS + +# cone-search radius defining which NEOWISE sources are associated with each target object +MATCH_RADIUS = 1 * u.arcsec +``` + +### 3.2 Load NEOWISE metadata + ++++ + +The metadata contains column names, schema, and row-group statistics for every file in the dataset. +We'll load it as a pyarrow dataset. + +```{code-cell} ipython3 +# This catalog is so big that even the metadata is big. +# Expect this cell to take about 30 seconds per year. + +# This information can be found at https://irsa.ipac.caltech.edu/cloud_access/. +bucket = "nasa-irsa-wise" +base_prefix = "wise/neowiser/catalogs/p1bs_psd/healpix_k5" +metadata_path = ( + lambda yr: f"{bucket}/{base_prefix}/year{yr}/neowiser-healpix_k5-year{yr}.parquet/_metadata" +) +fs = pyarrow.fs.S3FileSystem(region="us-west-2", anonymous=True) + +# list of datasets, one per year +year_datasets = [ + pyarrow.dataset.parquet_dataset(metadata_path(yr), filesystem=fs, partitioning="hive") + for yr in YEARS +] + +# unified dataset, all years +neowise_ds = pyarrow.dataset.dataset(year_datasets) +``` + +## 4. Define functions to filter and load data + ++++ + +These functions will be used in the next section. +Defining them here in the notebook is useful for demonstration and should work seamlessly on Linux, which includes most science platforms. +Mac and Windows users should see the note at the end of the notebook. +However, note that this use case is likely too large for a laptop and may perform poorly and/or crash if attempted. + +```{code-cell} ipython3 +# If you have your own list of target objects, replace this function to load your sample. +def load_targets_Downes2001(radius=1 * u.arcsec): + """Load a sample of targets and return a pandas DataFrame. + + References: + - Downes et al., 2001 ([2001PASP..113..764D](https://ui.adsabs.harvard.edu/abs/2001PASP..113..764D/abstract)). + - https://cdsarc.cds.unistra.fr/ftp/V/123A/ReadMe + + Parameters + ---------- + radius : astropy.Quantity (optional) + Radius for the cone search around each target. This is used to determine which partition(s) + need to be searched for a given target. Use the same radius here as in the rest of the notebook. + + Returns + ------- + pandas.DataFrame + The loaded targets with the following columns: + - uid: Unique identifier of the target. + - GCVS: Name in the General Catalogue of Variable Stars if it exists, else the constellation name. + - RAJ2000: Right Ascension of the target in J2000 coordinates. + - DEJ2000: Declination of the target in J2000 coordinates. + - healpix_k5: HEALPix pixel index at order k=5. + """ + astroquery.vizier.Vizier.ROW_LIMIT = -1 + # https://cdsarc.cds.unistra.fr/vizier/notebook.gml?source=V/123A + # https://cdsarc.cds.unistra.fr/ftp/V/123A/ReadMe + CATALOGUE = "V/123A" + voresource = pyvo.registry.search(ivoid=f"ivo://CDS.VizieR/{CATALOGUE}")[0] + tap_service = voresource.get_service("tap") + + # Query Vizier and load targets to a dataframe. + cv_columns = ["uid", "GCVS", "RAJ2000", "DEJ2000"] + cvs_records = tap_service.run_sync( + f'SELECT {",".join(cv_columns)} from "{CATALOGUE}/cv"' + ) + cvs_df = cvs_records.to_table().to_pandas() + + # Add a new column containing a list of all order k HEALPix pixels that overlap with + # the CV's position plus search radius. + cvs_df["healpix_k5"] = [ + hpgeom.query_circle( + a=cv.RAJ2000, + b=cv.DEJ2000, + radius=radius.to_value("deg"), + nside=hpgeom.order_to_nside(order=5), + nest=True, + inclusive=True, + ) + for cv in cvs_df.itertuples() + ] + # Explode the lists of pixels so the dataframe has one row per target per pixel. + # Targets near a pixel boundary will now have more than one row. + # Later, we'll search each pixel separately for NEOWISE detections and then + # concatenate the matches for each target to produce complete light curves. + cvs_df = cvs_df.explode("healpix_k5", ignore_index=True) + + return cvs_df +``` + +```{code-cell} ipython3 +# This is the main function. +def load_lightcurves_one_partition(targets_group): + """Load lightcurves from a single partition. + + Parameters + ---------- + targets_group : tuple + Tuple of pixel index and sub-DataFrame (result of DataFrame groupby operation). + + Returns + ------- + pd.DataFrame + The lightcurves for targets found in this partition. + """ + # These global variables will be set when the worker is initialized. + global _neowise_ds + global _columns + global _radius + + # Get row filters that will limit the amount of data loaded from this partition. + # It is important for these filters to be efficient for the specific use case. + filters = _construct_dataset_filters(targets_group=targets_group, radius=_radius) + + # Load this slice of the dataset to a pyarrow Table. + pixel_tbl = _neowise_ds.to_table(columns=_columns, filter=filters) + + # Associate NEOWISE detections with targets to get the light curves. + lightcurves_df = _cone_search( + targets_group=targets_group, pixel_tbl=pixel_tbl, radius=_radius + ) + + return lightcurves_df +``` + +```{code-cell} ipython3 +# The filters returned by this function need to be efficient for the specific use case. +def _construct_dataset_filters(*, targets_group, radius, scale_factor=100): + """Construct dataset filters for a box search around all targets in the partition. + + Parameters + ---------- + targets_group : tuple + Tuple of pixel index and sub-DataFrame (result of DataFrame groupby operation). + radius : astropy.Quantity + The radius used for constructing the RA and Dec filters. + scale_factor : int (optional) + Factor by which the radius will be multiplied to ensure that the box encloses + all relevant detections. + + Returns + ------- + filters : pyarrow.compute.Expression + The constructed filters based on the given inputs. + """ + pixel, locations_df = targets_group + + # Start with a filter for the partition. This is the most important one because + # it tells the Parquet reader to just skip all the other partitions. + filters = pyarrow.compute.field("healpix_k5") == pixel + + # Add box search filters. For our CV sample, one box encompassing all targets in + # the partition should be sufficient. Make a different choice if you use a different + # sample and find that this loads more data than you want to handle at once. + buffer_dist = scale_factor * radius.to_value("deg") + for coord, target_coord in zip(["ra", "dec"], ["RAJ2000", "DEJ2000"]): + coord_fld = pyarrow.compute.field(coord) + + # Add a filter for coordinate lower limit. + coord_min = locations_df[target_coord].min() + filters = filters & (coord_fld > coord_min - buffer_dist) + + # Add a filter for coordinate upper limit. + coord_max = locations_df[target_coord].max() + filters = filters & (coord_fld < coord_max + buffer_dist) + + # Add your own additional requirements here, like magnitude limits or quality cuts. + # See the AllWISE notebook for more filter examples and links to pyarrow documentation. + # We'll add a filter for sources not affected by contamination or confusion. + filters = filters & pyarrow.compute.equal(pyarrow.compute.field("cc_flags"), "0000") + + return filters +``` + +```{code-cell} ipython3 +def _cone_search(*, targets_group, pixel_tbl, radius): + """Perform a cone search to select NEOWISE detections belonging to each target object. + + Parameters + ---------- + targets_group : tuple + Tuple of pixel index and sub-DataFrame (result of DataFrame groupby operation). + pixel_tbl : pyarrow.Table + Table of NEOWISE data for a single pixel + radius : astropy.Quantity + Cone search radius. + + Returns + ------- + match_df : pd.DataFrame + A dataframe with all matched sources. + """ + _, targets_df = targets_group + + # Cone search using astropy to select NEOWISE detections belonging to each object. + pixel_skycoords = SkyCoord(ra=pixel_tbl["ra"] * u.deg, dec=pixel_tbl["dec"] * u.deg) + targets_skycoords = SkyCoord(targets_df["RAJ2000"], targets_df["DEJ2000"], unit=u.deg) + targets_ilocs, pixel_ilocs, _, _ = pixel_skycoords.search_around_sky( + targets_skycoords, radius + ) + + # Create a dataframe with all matched source detections. + match_df = pixel_tbl.take(pixel_ilocs).to_pandas() + + # Add the target IDs by joining with targets_df. + match_df["targets_ilocs"] = targets_ilocs + match_df = match_df.set_index("targets_ilocs").join(targets_df.reset_index().uid) + + return match_df +``` + +```{code-cell} ipython3 +# This function will be called once for each worker in the pool. +def init_worker(neowise_ds, columns, radius): + """Set global variables '_neowise_ds', '_columns', and '_radius'. + + These variables will be the same for every call to 'load_lightcurves_one_partition' + and will be set once for each worker. It is important to pass 'neowise_ds' this + way because of its size and the way it will be used. (For the other two, it makes + little difference whether we use this method or pass them directly as function + arguments to 'load_lightcurves_one_partition'.) + + Parameters + ---------- + neowise_ds : pyarrow.dataset.Dataset + NEOWISE metadata loaded as a PyArrow dataset. + columns : list + Columns to include in the output DataFrame of light curves. + radius : astropy.Quantity + Cone search radius. + """ + global _neowise_ds + _neowise_ds = neowise_ds + global _columns + _columns = columns + global _radius + _radius = radius + +``` + +## 5. Load light curves + ++++ + +Load the target objects' coordinates and other info. + +```{code-cell} ipython3 +targets_df = load_targets_Downes2001(radius=MATCH_RADIUS) +targets_df.head() +``` + +Search the NEOWISE Source Table for all targets (positional matches) and load the light curves. +Partitions are searched in parallel. +For targets located near partition boundaries, relevant partitions will be searched +independently for the given target and the results will be concatenated. +If searching all NEOWISE years, this may take 45 minutes or more. + +```{code-cell} ipython3 +# Group targets by partition. 'load_lightcurves_one_partition' will be called once per group. +targets_groups = targets_df.groupby("healpix_k5") +# Arguments for 'init_worker'. +init_args = (neowise_ds, COLUMN_SUBSET, MATCH_RADIUS) + +# Start a multiprocessing pool and load the target light curves in parallel. +# About 1900 unique pixels in targets_df, 8 workers, 48 chunksize => ~5 chunks per worker. +nworkers = 8 +chunksize = 48 +with multiprocessing.Pool(nworkers, initializer=init_worker, initargs=init_args) as pool: + lightcurves = [] + for lightcurves_df in pool.imap_unordered( + load_lightcurves_one_partition, targets_groups, chunksize=chunksize + ): + lightcurves.append(lightcurves_df) +neowise_lightcurves_df = pd.concat(lightcurves).sort_values("mjd").reset_index(drop=True) +``` + +```{code-cell} ipython3 +neowise_lightcurves_df.head() +``` + +## 6. Plot NEOWISE light curves + +The light curves will have large gaps due to the observing cadence, so we'll plot each +"epoch" separately to see them better. + +```{code-cell} ipython3 +# get the light curves of the target with the most data +target_uid = neowise_lightcurves_df.groupby("uid").mjd.count().sort_values().index[-1] +target_df = neowise_lightcurves_df.loc[neowise_lightcurves_df.uid == target_uid] + +# list of indexes that separate epochs (arbitrarily at delta mjd > 30) +epoch_idxs = target_df.loc[target_df.mjd.diff() > 30].index.to_list() +epoch_idxs = epoch_idxs + [target_df.index[-1]] # add the final index + +# make the figure +ncols = 4 +nrows = int(np.ceil(len(epoch_idxs) / ncols)) +fig, axs = plt.subplots(nrows, ncols, sharey=True, figsize=(3 * ncols, 2.5 * nrows)) +axs = axs.flatten() +idx0 = target_df.index[0] +for i, (idx1, ax) in enumerate(zip(epoch_idxs, axs)): + epoch_df = target_df.loc[idx0 : idx1 - 1, LIGHTCURVE_COLUMNS].set_index("mjd") + for col in FLUX_COLUMNS: + ax.plot(epoch_df[col], ".", markersize=3, label=col) + ax.set_title(f"epoch {i}") + ax.xaxis.set_ticks( # space by 10 + range(int((ax.get_xlim()[0] + 10) / 10) * 10, int(ax.get_xlim()[1]), 10) + ) + idx0 = idx1 +axs[0].legend() +fig.supxlabel("MJD") +fig.supylabel("RAW FLUX") +fig.suptitle(f"NEOWISE light curves for target CV {target_uid}") +fig.tight_layout() +plt.show(block=False) +``` + +----- + +[*] Note to Mac and Windows users: + +You will need to copy the functions and imports from this notebook into a separate '.py' file and then import them in order to use the multiprocessing pool for parallelization. +In addition, you may need to load `neowise_ds` separately for each child process (i.e., worker) by adding that code to the `init_worker` function instead of passing it in as an argument. +This has to do with differences in what does / does not get copied into the child processes on different platforms. + +About this notebook: + +- Author: Troy Raen (Applications Developer, IRSA) and the IPAC Science Platform team +- Contact: [https://irsa.ipac.caltech.edu/docs/help_desk.html](https://irsa.ipac.caltech.edu/docs/help_desk.html) +- Updated: 2024-08-08 diff --git a/tutorials/parquet-catalog-demos/neowise-source-table-strategies.md b/tutorials/parquet-catalog-demos/neowise-source-table-strategies.md new file mode 100644 index 00000000..9c60b5a9 --- /dev/null +++ b/tutorials/parquet-catalog-demos/neowise-source-table-strategies.md @@ -0,0 +1,562 @@ +--- +jupytext: + text_representation: + extension: .md + format_name: myst + format_version: 0.13 + jupytext_version: 1.16.1 +kernelspec: + display_name: science_demo + language: python + name: python3 +--- + +# Strategies to Efficiently Work with NEOWISE Single-exposure Source Table in Parquet + ++++ + +This notebook discusses strategies for working with the Apache Parquet version of the +[NEOWISE](https://irsa.ipac.caltech.edu/Missions/wise.html) Single-exposure Source Table +and provides the basic code needed for each approach. +This is a very large catalog -- 10 years and 40 terabytes in total with 145 columns and 200 billion rows. +Most of the work shown in this notebook is how to efficiently deal with so much data. + +Learning Goals: + +- Identify use cases that will benefit most from using this Parquet format. +- Understand how this dataset is organized and three different ways to efficiently + slice it in order to obtain a subset small enough to load into memory and process. +- Feel prepared to apply these methods to a new use case and determine efficient filtering and slicing options. + ++++ + +## 1. Introduction + ++++ + +The NEOWISE Single-exposure Source Table comprises 10 years of data. +Each year on its own would be considered "large" compared to astronomy catalogs produced +contemporaneously, so working with the full dataset requires extra consideration. +In this Parquet version, each year is stored as an independent Parquet dataset. +This tutorial shows how to combine them and work with all years as one. +The data are partitioned by HEALPix ([Górski et al., 2005](https://doi.org/10.1086/427976)) order k=5. +HEALPix is a tessellation of the sky, so partitioning the dataset this way makes it especially +efficient for spatial queries. +In addition, the access methods shown below are expected to perform well when parallelized. + +The terms "partition", "filter", and "slice" all relate to subsets of data. +In this notebook we'll use them with definitions that overlap but are not identical, as follows. +A "partition" includes all data observed in a single HEALPix pixel. +This data is grouped together in the Parquet files. +A "filter" is a set of criteria defined by the user and applied when reading data so that only the desired rows are loaded. +We'll use it exclusively to refer to a PyArrow `filter`. +The criteria can include partitions and/or any other column in the dataset. +A "slice" is a generic subset of data. +There are a few ways to obtain a slice; one is by applying a filter. + +This notebook is expected to require about 2 CPUs and 50G RAM and to complete in about 10 minutes. +These estimates are based on testing in science platform environments. +Your numbers will vary based on many factors including compute power, bandwidth, and physical distance from the data. +The required RAM and runtime can be reduced by limiting the number of NEOWISE years loaded. + ++++ + +### 1.1 When to use the Parquet version + +IRSA provides large catalogs in file formats (e.g., Parquet) primarily to support use cases that require bulk access. +The Parquet version of the NEOWISE Source Table, coupled with the methods demonstrated in this tutorial, +are expected to be the most efficient option for large-ish use cases like classifying, clustering, or +building light curves for many objects. +In general, the efficiency will increase with the number of rows required from each slice because the slice +only needs to be searched once regardless of the number of rows that are actually loaded. +In addition, this access route tends to perform well when parallelized. +Note that these use cases (including this notebook) are often too large for a laptop and may perform +poorly and/or crash if attempted. + +For small-ish use cases like searching for a handful of objects, other access routes like +PyVO and TAP queries will be faster. + +Consider using this tutorial if either of the following is true: + +- Your use case is large enough that you are considering parallelizing your code to speed it up. +- Your sample size is large enough that loading the data using a different method is likely to + take hours, days, or longer. + ++++ + +### 1.2 Recommended approach + +The basic process is: + +1. Load the catalog metadata as a PyArrow dataset. +2. Decide how to slice the dataset (e.g., by year, partition, and/or file) depending on your use case. +3. Iterate and/or parallelize over the slices. For each slice: + 1. Use the PyArrow dataset to load data of interest, applying row filters during the read. + 2. Process the data as you like (e.g., cluster, classify, etc.). + 3. Write your results to disk and/or return them for further processing. +4. (Optional) Concatenate your results and continue processing. + +This notebook covers steps 1 through 3.1 and indicates where to insert your own code to proceed with 3.2. +Here we iterate over slices, but the same code can be parallelized using any multi-processing framework. +A fully-worked example is shown in the light curve notebook linked below. + ++++ + +### 1.3 See also + +- [IRSA Cloud Access Intro](https://irsa.ipac.caltech.edu/docs/notebooks/cloud-access-intro.html) +- [AllWISE Source Catalog Demo](https://irsa.ipac.caltech.edu/docs/notebooks/wise-allwise-catalog-demo.html) +- [Make Light Curves from NEOWISE Single-exposure Source Table](https://irsa.ipac.caltech.edu/docs/notebooks/neowise-source-table-lightcurves.html) + ++++ + +## 2. Imports + +```{code-cell} ipython3 +# Uncomment the next line to install dependencies if needed. +# !pip install hpgeom pandas pyarrow +``` + +```{code-cell} ipython3 +import re # parse strings +import sys # check size of loaded data + +import hpgeom # HEALPix math +import pandas as pd # store and manipulate table data +import pyarrow.compute # construct dataset filters +import pyarrow.dataset # load and query the NEOWISE dataset +import pyarrow.fs # interact with the S3 bucket storing the NEOWISE catalog +``` + +## 3. Setup + +### 3.1 Define variables and helper functions + ++++ + +Choose which NEOWISE years to include. +Expect the notebook to require about 4G RAM and 1 minute of runtime per year. + +```{code-cell} ipython3 +# All NEOWISE years => about 40G RAM and 10 minutes runtime +YEARS = list(range(1, 11)) + +# To reduce the needed RAM or runtime, uncomment the next line and choose your own years. +# Years 1 and 9 are needed for the median_file and biggest_file (defined below). +# YEARS = [1, 9] +``` + +Column and partition variables: + +```{code-cell} ipython3 +# subset of columns to load +flux_columns = ["w1flux", "w1sigflux", "w2flux", "w2sigflux"] +COLUMN_SUBSET = ["cntr", "source_id", "ra", "dec"] + flux_columns + +# partitioning info. do not change these values. +K = 5 # healpix order of the catalog partitioning +KCOLUMN = "healpix_k5" # partitioning column name +KFIELD = pyarrow.compute.field(KCOLUMN) # pyarrow compute field, to be used in filters +``` + +Paths: + +```{code-cell} ipython3 +# We're going to look at several different files, so make a function to return the path. +def neowise_path(year, file="_metadata"): + """Return the path to a file. Default is "_metadata" file of the given year's dataset. + + Parameters + ---------- + year : int + NEOWISE year for which the path is being generated. + file : str + The name of the file to the returned path. + + Returns + ------- + str + The path to the file. + """ + # This information can be found at https://irsa.ipac.caltech.edu/cloud_access/. + bucket = "nasa-irsa-wise" + base_prefix = "wise/neowiser/catalogs/p1bs_psd/healpix_k5" + root_dir = f"{bucket}/{base_prefix}/year{year}/neowiser-healpix_k5-year{year}.parquet" + return f"{root_dir}/{file}" +``` + +Some representative partitions and files (see dataset stats in the Appendix for how we determine these values): + +```{code-cell} ipython3 +# pixel index of the median partition and the biggest partition by number of rows +median_part = 10_936 +biggest_part = 8_277 + +# path to the median file and the biggest file by file size on disk (see Appendix) +median_file = neowise_path(9, "healpix_k0=3/healpix_k5=3420/part0.snappy.parquet") +biggest_file = neowise_path(1, "healpix_k0=2/healpix_k5=2551/part0.snappy.parquet") +``` + +Convenience function for displaying a table size: + +```{code-cell} ipython3 +# We'll use this function throughout the notebook to see how big different tables are. +def print_table_size(table, pixel_index=None): + """Prints the shape (rows x columns) and size (GiB) of the given table. + + Parameters + ---------- + table : pyarrow.Table + The table for which to print the size. + pixel_index : int or str or None + The pixel index corresponding to the partition this table was loaded from. + """ + if pixel_index is not None: + print(f"pixel index: {pixel_index}") + print(f"table shape: {table.num_rows:,} rows x {table.num_columns} columns") + print(f"table size: {sys.getsizeof(table) / 1024**3:.2f} GiB") +``` + +### 3.2 Load NEOWISE metadata as a pyarrow dataset + ++++ + +The metadata contains column names, schema, and row-group statistics for every file in the dataset. +Later, we will use this pyarrow dataset object to slice and query the catalog in several different ways. + +```{code-cell} ipython3 +# This catalog is so big that even the metadata is big. +# Expect this cell to take about 30 seconds per year. +fs = pyarrow.fs.S3FileSystem(region="us-west-2", anonymous=True) + +# list of datasets, one per year +year_datasets = [ + pyarrow.dataset.parquet_dataset(neowise_path(yr), filesystem=fs, partitioning="hive") + for yr in YEARS +] + +# unified dataset, all years +neowise_ds = pyarrow.dataset.dataset(year_datasets) +``` + +`neowise_ds` is a [UnionDataset](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.UnionDataset.html). +All methods demonstrated for pyarrow datasets in the AllWISE demo notebook can be used with +`neowise_ds` and will be applied to all years as if they were a single dataset. +In addition, a separate [Dataset](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html) +for each year is stored in the list attribute `neowise_ds.children` (== `year_datasets`, loaded above), +and the same methods can be applied to them individually. + ++++ + +## 4. Example: Slice by partition + ++++ + +This example shows how to load data from each partition separately. +The actual "slicing" is done by applying a filter to the pyarrow dataset `neowise_ds`. +Constructing filters was discussed in the AllWISE notebook linked above. + +```{code-cell} ipython3 +# number of order K pixels covering the full sky +npixels = hpgeom.nside_to_npixel(hpgeom.order_to_nside(order=K)) + +# iterate over all partitions +for pix in range(npixels): + + # slice and load to get all rows in this partition, subset of columns + pixel_tbl = neowise_ds.to_table(filter=(KFIELD == pix), columns=COLUMN_SUBSET) + + # insert your code here to continue processing + + # we'll just print the table size to get a sense of how much data has been loaded + print_table_size(table=pixel_tbl, pixel_index=pix) + + # when done, you may want to delete pixel_tbl to free the memory + del pixel_tbl + # we'll stop after one partition + break +``` + +`pixel_tbl` is a (pyarrow) [Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) +containing all NEOWISE sources with an ra/dec falling within HEALPix order 5 pixel `pix`. +Use `pixel_tbl.to_pandas()` to convert the table to a pandas dataframe. + ++++ + +How big are the partitions? (see Appendix for details) + +```{code-cell} ipython3 +# median partition +median_part_tbl = neowise_ds.to_table( + filter=(KFIELD == median_part), columns=COLUMN_SUBSET +) +print_table_size(table=median_part_tbl, pixel_index=median_part) +``` + +Often only a few columns are needed for processing, so most partitions will fit comfortably in memory. +(The recommended maximum for an in-memory table/dataframe is typically 1GB, but there is +no strict upper limit -- performance will depend on the compute resources available.) + +However, beware that the largest partitions are quite large: + +```{code-cell} ipython3 +# biggest partition +# this is very large, so we'll restrict the number of columns to one +biggest_part_tbl = neowise_ds.to_table( + filter=(KFIELD == biggest_part), columns=COLUMN_SUBSET[:1] +) +print_table_size(table=biggest_part_tbl, pixel_index=biggest_part) + +# Additional filters can be included to reduce the number of rows if desired. +# Another option is to load individual files. +``` + +```{code-cell} ipython3 +# cleanup +del median_part_tbl +del biggest_part_tbl +``` + +## 5. Example: Slice by file + ++++ + +If you don't need data for all years at the same time, you may want to load individual files. +Generally, there is 1 file per partition per year, but a few partitions are as large as 6+ files per year. +Most of the files are about 0.3GB (compressed on disk) but about 1% are > 1GB. +Thus it should be reasonable to load at least a subset of columns for every row in a file. + ++++ + +The actual "slicing" here is done by using `neowise_ds` to access a +dataset [Fragment](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Fragment.html) +(`frag` in the code below) which represents a single file. + +```{code-cell} ipython3 +# slice by file and iterate +for frag in neowise_ds.get_fragments(): + # load the slice to get every row in the file, subset of columns + file_tbl = frag.to_table(columns=COLUMN_SUBSET) + + # insert your code here to continue processing the file as desired + + # if you need to see which file this is, parse the path + print(f"file path: {frag.path}") + # let's see how much data this loaded + print_table_size(table=file_tbl) + + # again, we'll stop after one + del file_tbl + break +``` + +This can be combined with the previous example to iterate over the files in a single partition +(left as an exercise for the reader). + ++++ + +How big are the files? + +```{code-cell} ipython3 +# median file +median_file_frag = [ + frag for frag in neowise_ds.get_fragments() if frag.path == median_file +][0] +median_file_tbl = median_file_frag.to_table(columns=COLUMN_SUBSET) +print_table_size(table=median_file_tbl) +``` + +```{code-cell} ipython3 +# biggest file +biggest_file_frag = [ + frag for frag in neowise_ds.get_fragments() if frag.path == biggest_file +][0] +biggest_file_tbl = biggest_file_frag.to_table(columns=COLUMN_SUBSET) +print_table_size(table=biggest_file_tbl) +``` + +```{code-cell} ipython3 +# cleanup +del median_file_tbl +del biggest_file_tbl +``` + +## 6. Example: Slice by year + ++++ + +If you want to handle the years independently you can work with the per-year datasets. +We actually created these "slices" in the Setup section with `year_datasets`, and that +same list is now accessible in `neowise_ds.children` used below. +Any of the techniques shown in this notebook or those listed under "See also" can also +be applied to the per-year datasets. + +```{code-cell} ipython3 +# slice by year and iterate. zip with YEARS so that we know which slice this is. +for year, year_ds in zip(YEARS, neowise_ds.children): + # insert your code here to process year_ds as desired. + # filter and load, iterate over partitions or files, etc. + + # we'll just look at some basic metadata. + num_rows = sum(frag.metadata.num_rows for frag in year_ds.get_fragments()) + num_files = len(year_ds.files) + print(f"NEOWISE year {year} dataset: {num_rows:,} rows in {num_files:,} files") +``` + +## Appendix + ++++ + +### A.1 Considerations when extending to specific use cases + +Because the catalog is so large, you will need to carefully consider your specific problem and +determine how to slice and filter the data most efficiently. +There is no one right answer; it will depend on the use case. + +#### A.1.1 Filtering + +Filter out as much data as possible as early as possible. Ideas to consider are: + +1. With the Parquet file format, you can apply filters during the read to avoid loading + rows that you don't need. + - Pandas (not demonstrated here) supports basic filters. + - PyArrow (demonstrated here) also supports complex filters which allow you to compare + values between columns and/or construct new columns on the fly (e.g., subtracting + magnitude columns to construct a new color column, as done in the AllWISE notebook). +2. Queries (i.e., loading data by applying filters) will be *much* more efficient when they + include a filter on the partitioning column ('healpix_k5'; demonstrated above). + - Notice both that this is essentially equivalent to slicing by partition and that + you can filter for more than one partition at a time. + - This is highly recommended even if your use case doesn't explicitly care about it. + Exceptions include situations where you're working with individual files and when + it's impractical or counterproductive for the science. +3. You should also include filters specific to your use case if possible. +4. Exceptions: Sometimes it's not easy to write a dataset filter for the query. + A cone search is a common example. + In principal it could be written as a PyArrow dataset filter, but in practice the correct + formula is much too complicated. + In this case, it's easier to write dataset filters for broad RA and Dec limits and then + do the actual cone search using `astropy`. + This approach is still quite performant (see the NEOWISE light curves notebook). + +#### A.1.2 Slicing + +Slice the dataset in some way(s), then iterate and/or parallelize over the slices. Ideas to consider are: + +1. Choose your slices so that you can: + - Run your processing code on one slice independently. For example, if your code must + see all the data for some target object (RA and Dec) at the same time, you may + slice the dataset by partition, but don't slice it by year. + - Load all data in the slice into memory at once (after applying your filters during the read). + This notebook shows how to determine how big a slice of data is in order to guide this decision. +2. By default, slice by partition. If this is too much data, you may also want to slice + by year and/or file. +3. If you have enough memory to load more than one slice simultaneously, parallelize over + the slices to speed up your code. + +### A.2 Inspect dataset stats + ++++ + +When deciding how to slice and filter the dataset, it can be useful to understand +dataset statistics like partition and file sizes. + +```{code-cell} ipython3 +def pixel_index_from_path(path, k_column=KCOLUMN): + """Parse the path and return the partition pixel index. + + Parameters + ---------- + path : str + The path to parse. + k_column : str (optional) + Name of the partitioning column. + + Returns + ------- + int + The partition pixel index parsed from the path. + """ + pattern = rf"({k_column}=)([0-9]+)" # matches strings like "healpix_k5=1124" + return int(re.search(pattern, path).group(2)) # pixel index, e.g., 1124 +``` + +```{code-cell} ipython3 +# load some file statistics to a dataframe +file_stats = pd.DataFrame( + columns=["path", KCOLUMN, "numrows"], + data=[ + (frag.path, pixel_index_from_path(frag.path), frag.metadata.num_rows) + for frag in neowise_ds.get_fragments() + ], +) +``` + +```{code-cell} ipython3 +file_stats.sample(5) +``` + +```{code-cell} ipython3 +file_stats.describe() +``` + +#### A.2.1 Dataset statistics per file + +```{code-cell} ipython3 +# visualize distribution of file sizes (number of rows) +ax = file_stats.numrows.hist(log=True) +ax.set_xlabel("Number of rows") +ax.set_ylabel("Number of files") +``` + +```{code-cell} ipython3 +# largest file +file_stats.loc[file_stats.numrows == file_stats.numrows.max()].head(1) +``` + +```{code-cell} ipython3 +# median file +file_stats.sort_values("numrows").iloc[len(file_stats.index) // 2] +``` + +#### A.2.2 Dataset statistics per partition + +```{code-cell} ipython3 +# get stats per partition +k_groups = file_stats[[KCOLUMN, "numrows"]].groupby(KCOLUMN) +per_part = k_groups.sum() +per_part["numfiles"] = k_groups.count() +``` + +```{code-cell} ipython3 +per_part.sample(5) +``` + +```{code-cell} ipython3 +per_part.describe() +``` + +```{code-cell} ipython3 +# visualize number of rows per partition +per_part.numrows.plot( + logy=True, xlabel=f"{KCOLUMN} pixel index", ylabel="Number of rows per partition" +) +``` + +```{code-cell} ipython3 +# largest partition +per_part.loc[per_part.numrows == per_part.numrows.max()] +``` + +```{code-cell} ipython3 +# median partition +per_part.sort_values("numrows").iloc[len(per_part.index) // 2] +``` + +----- + +About this notebook: + +- Author: Troy Raen (Applications Developer, IRSA) and the IPAC Science Platform team +- Contact: [https://irsa.ipac.caltech.edu/docs/help_desk.html](https://irsa.ipac.caltech.edu/docs/help_desk.html) +- Updated: 2024-08-08 diff --git a/tutorials/requirements.txt b/tutorials/requirements.txt index 3e752811..19fc53a6 100644 --- a/tutorials/requirements.txt +++ b/tutorials/requirements.txt @@ -3,6 +3,7 @@ numpy matplotlib astropy pyvo>=1.5 +astroquery scipy pyarrow>=10.0.1 hpgeom