|
| 1 | +from .utils import deferred_import, refcat_name_to_dataset_name, resolve_bbox2shard_ids, create_bbox_and_wcs_from_decam_fits, resolve_exposure_shard_ids, load_refcat_yaml, make_refcat_import |
| 2 | +import argparse |
| 3 | +import os |
| 4 | +import yaml |
| 5 | +import logging |
| 6 | +import sys |
| 7 | +import joblib |
| 8 | + |
| 9 | +logger = logging.getLogger(__name__) |
| 10 | +logging.basicConfig(format="[%(levelname)s:%(filename)s:%(lineno)s - %(funcName)5s()] %(message)s") |
| 11 | + |
| 12 | + |
| 13 | +def main(): |
| 14 | + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
| 15 | + parser.add_argument("refcat_name", type=str, help="The reference catalog name") |
| 16 | + parser.add_argument("--output", "-o", type=str, default="data/refcats", help="The output file to write the trimmed refcat YAML to") |
| 17 | + parser.add_argument("--paths", "-p", nargs="+", type=str, default=[], help="The paths to fits files to search") |
| 18 | + parser.add_argument("--repo", "-b", type=str, default=None, help="The repo to search for exposures") |
| 19 | + parser.add_argument("--dataset", default=None, help="The dataset name to search if using repo") |
| 20 | + parser.add_argument("--collections", default=None, help="The collections to search if using repo") |
| 21 | + parser.add_argument("--where", default="", help="A constraint for the dataset search if using repo") |
| 22 | + parser.add_argument("--refcat_indexer", default="HTM", help="The refcat indexer") |
| 23 | + parser.add_argument("--pixel_margin", type=int, default=300, help="The pixel margin for determining overlapping refcat shards") |
| 24 | + parser.add_argument("--log-level", help="The logging level, one of DEBUG, INFO, WARN, ERROR", default="INFO") |
| 25 | + parser.add_argument("--export-run", type=str, default="refcats", help="The RUN collection name to export collections into") |
| 26 | + parser.add_argument("--export-dataset-name", type=str, default=None, help="The dataset name to use for exported datasets") |
| 27 | + parser.add_argument("--import-file", action="store_true", help="Make import ECSV file (new style) instead of YAML export file") |
| 28 | + parser.add_argument("--processes", "-J", type=int, default=8, help="Number of processes to use for opening fits files or loading dataset refs") |
| 29 | + |
| 30 | + args = parser.parse_args() |
| 31 | + if args.repo is not None and args.dataset is None: |
| 32 | + raise ValueError("must use argument --dataset if specifying repo") |
| 33 | + logger.setLevel(getattr(logging, args.log_level)) |
| 34 | + |
| 35 | + os.makedirs(args.output, exist_ok=True) |
| 36 | + |
| 37 | + deferred_import("lsst.meas.algorithms", "measAlgs", ns=globals()) |
| 38 | + refCatConf = measAlgs.DatasetConfig() |
| 39 | + ref_dataset_name = refcat_name_to_dataset_name.get(args.refcat_name, None) |
| 40 | + if ref_dataset_name is None: |
| 41 | + raise ValueError(f"{args.refcat_name} is not an alias for any dataset name, use one of {list(refcat_name_to_dataset_name.keys())}") |
| 42 | + refCatConf.ref_dataset_name = ref_dataset_name |
| 43 | + if args.refcat_indexer != "HTM": |
| 44 | + raise ValueError(f"refcat indexer {args.refcat_indexer} is not supported") |
| 45 | + refCatConf.indexer = args.refcat_indexer |
| 46 | + |
| 47 | + if args.export_dataset_name is None: |
| 48 | + export_dataset_name = ref_dataset_name |
| 49 | + else: |
| 50 | + export_dataset_name = args.export_dataset_name |
| 51 | + |
| 52 | + bboxes = [] |
| 53 | + wcss = [] |
| 54 | + if args.paths: |
| 55 | + def work(path): |
| 56 | + import logging |
| 57 | + logging.basicConfig() |
| 58 | + logger = logging.getLogger(__name__) |
| 59 | + logger.setLevel(getattr(logging, args.log_level)) |
| 60 | + logger.info(f"loading fits {path}") |
| 61 | + return create_bbox_and_wcs_from_decam_fits(path) |
| 62 | + |
| 63 | + results = joblib.Parallel(n_jobs=args.processes)(joblib.delayed(work)(path) for path in args.paths) |
| 64 | + for bbox, wcs in results: |
| 65 | + bboxes.extend(bbox) |
| 66 | + wcss.extend(wcs) |
| 67 | + |
| 68 | + # for path in args.paths: |
| 69 | + # logger.info(f"loading fits {path}") |
| 70 | + # bbox, wcs = create_bbox_and_wcs_from_decam_fits(path) |
| 71 | + # bboxes.extend(bbox) |
| 72 | + # wcss.extend(wcs) |
| 73 | + |
| 74 | + if args.repo: |
| 75 | + deferred_import("lsst.daf.butler", "dafButler", ns=globals()) |
| 76 | + butler = dafButler.Butler(args.repo, collections=args.collections) |
| 77 | + refs = butler.registry.queryDatasets(args.dataset, where=args.where) |
| 78 | + def work(ref): |
| 79 | + import logging |
| 80 | + logging.basicConfig() |
| 81 | + logger = logging.getLogger(__name__) |
| 82 | + logger.setLevel(getattr(logging, args.log_level)) |
| 83 | + logger.info(f"loading dataset {ref}") |
| 84 | + |
| 85 | + wcs = butler.get(f"{args.dataset}.wcs", ref.dataId, collections=ref.run) |
| 86 | + bbox = butler.get(f"{args.dataset}.detector", ref.dataId, collections=ref.run).getBBox() |
| 87 | + return bbox, wcs |
| 88 | + |
| 89 | + results = joblib.Parallel(n_jobs=args.processes)(joblib.delayed(work)(ref) for ref in refs) |
| 90 | + for bbox, wcs in results: |
| 91 | + bboxes.append(bbox) |
| 92 | + wcss.append(wcs) |
| 93 | + |
| 94 | + # for ref in refs: |
| 95 | + # logger.info(f"loading dataset {ref}") |
| 96 | + # bboxes.append(bbox) |
| 97 | + # wcss.append(wcs) |
| 98 | + |
| 99 | + shards = [] |
| 100 | + for bbox, wcs in zip(bboxes, wcss): |
| 101 | + shards.extend(resolve_bbox2shard_ids(refCatConf, bbox, wcs, pixelMargin=args.pixel_margin)) |
| 102 | + |
| 103 | + shards = list(set(shards)) |
| 104 | + logger.info("shards: %s", shards) |
| 105 | + if args.import_file: |
| 106 | + import_table = make_refcat_import(ref_dataset_name, shards, args.output) |
| 107 | + import_table.write(os.path.join(args.output, ref_dataset_name + ".ecsv"), format="ascii.ecsv") |
| 108 | + else: |
| 109 | + # load the full yaml and trim to include just the chosen shards |
| 110 | + logger.info(f"loading refcat for {ref_dataset_name}") |
| 111 | + refcat = load_refcat_yaml(ref_dataset_name) |
| 112 | + datasets = list(filter(lambda d : d['type'] == "dataset", refcat['data']))[0] |
| 113 | + collection = list(filter(lambda d : d['type'] == "collection", refcat['data']))[0] |
| 114 | + dataset_type = list(filter(lambda d : d['type'] == "dataset_type", refcat['data']))[0] |
| 115 | + collection['name'] = args.export_run |
| 116 | + dataset_type['name'] = export_dataset_name |
| 117 | + logger.info("trimming records") |
| 118 | + records = list(filter(lambda rec : rec['data_id'][0]['htm7'] in shards, datasets['records'])) |
| 119 | + datasets['records'] = records |
| 120 | + datasets['run'] = args.export_run |
| 121 | + datasets['dataset_type'] = export_dataset_name |
| 122 | + refcat['data'] = [collection, dataset_type, datasets] |
| 123 | + |
| 124 | + print(yaml.dump(refcat)) |
| 125 | + |
| 126 | +if __name__ == "__main__": |
| 127 | + main() |
| 128 | + |
0 commit comments