diff --git a/cluster_provisioning/modules/common/variables.tf b/cluster_provisioning/modules/common/variables.tf index 294c83db5..342f9e9f1 100644 --- a/cluster_provisioning/modules/common/variables.tf +++ b/cluster_provisioning/modules/common/variables.tf @@ -666,6 +666,54 @@ variable "queues" { "use_private_vpc" = false "use_on_demand" = false } + "opera-job_worker-rtc_for_dist_data_query_hist" = { + "name" = "opera-job_worker-rtc_for_dist_data_query_hist" + "instance_type" = ["m8a.large", "m8i-flex.large", "m8i.large", "m7a.large", "m7i-flex.large", "m6i.large", "m6a.large", "m5.large", "m5a.large"] + "user_data" = "launch_template_user_data.sh.tmpl" + "root_dev_size" = 50 + "data_dev_size" = 25 + "min_size" = 0 + "max_size" = 100 + "total_jobs_metric" = false + "use_private_vpc" = false + "use_on_demand" = true + } + "opera-job_worker-dist_s1_hist_on_first" = { + "name" = "opera-job_worker-dist_s1_hist_on_first" + "instance_type" = ["m8a.large", "m8i-flex.large", "m8i.large", "m7a.large", "m7i-flex.large", "m6i.large", "m6a.large", "m5.large", "m5a.large"] + "user_data" = "launch_template_user_data.sh.tmpl" + "root_dev_size" = 50 + "data_dev_size" = 25 + "min_size" = 0 + "max_size" = 100 + "total_jobs_metric" = false + "use_private_vpc" = false + "use_on_demand" = true + } + "opera-job_worker-dist_s1_hist_on_publication" = { + "name" = "opera-job_worker-dist_s1_hist_on_publication" + "instance_type" = ["m8a.large", "m8i-flex.large", "m8i.large", "m7a.large", "m7i-flex.large", "m6i.large", "m6a.large", "m5.large", "m5a.large"] + "user_data" = "launch_template_user_data.sh.tmpl" + "root_dev_size" = 50 + "data_dev_size" = 25 + "min_size" = 0 + "max_size" = 100 + "total_jobs_metric" = false + "use_private_vpc" = false + "use_on_demand" = true + } + "opera-job_worker-dist_s1_hist_on_complete" = { + "name" = "opera-job_worker-dist_s1_hist_on_complete" + "instance_type" = ["m8a.large", "m8i-flex.large", "m8i.large", "m7a.large", "m7i-flex.large", "m6i.large", "m6a.large", "m5.large", "m5a.large"] + "user_data" = "launch_template_user_data.sh.tmpl" + "root_dev_size" = 50 + "data_dev_size" = 25 + "min_size" = 0 + "max_size" = 100 + "total_jobs_metric" = false + "use_private_vpc" = false + "use_on_demand" = true + } "opera-job_worker-cslc_data_query" = { "name" = "opera-job_worker-cslc_data_query" "instance_type" = ["c6i.xlarge", "m6a.xlarge", "c6a.xlarge", "c5a.xlarge", "r7i.xlarge", "c7i.xlarge"] @@ -786,6 +834,18 @@ variable "queues" { "use_private_vpc" = false "use_on_demand" = false } + "opera-job_worker-rtc_for_dist_data_download_hist" = { + "name" = "opera-job_worker-rtc_for_dist_data_download_hist" + "instance_type" = ["c6in.large", "c5n.large", "m6in.large", "m5n.large"] + "user_data" = "launch_template_user_data.sh.tmpl" + "root_dev_size" = 50 + "data_dev_size" = 25 + "min_size" = 0 + "max_size" = 50 + "total_jobs_metric" = true + "use_private_vpc" = false + "use_on_demand" = false + } "opera-job_worker-ecmwf-merger" = { "name" = "opera-job_worker-ecmwf-merger" "instance_type" = ["r5a.4xlarge", "r6a.4xlarge", "r5.4xlarge", "r6i.4xlarge", "r7i.4xlarge", "r7a.4xlarge", "r6a.2xlarge"] diff --git a/conf/sds/files/datasets.json b/conf/sds/files/datasets.json index 78a16c6d1..abef7b638 100644 --- a/conf/sds/files/datasets.json +++ b/conf/sds/files/datasets.json @@ -511,6 +511,14 @@ "s3://{{ DATASET_S3_ENDPOINT }}:80/{{ DATASET_BUCKET }}/browse/inputs/GSLC_NI/{id}" ] } + }, + { + "ipath": "hysds::data/DIST_S1-state-config", + "level": "STATE-CONFIG", + "type": "DIST_S1-STATE-CONFIG", + "match_pattern": "/(?PDIST_S1_state-config_.+)$", + "alt_match_pattern": null, + "extractor": null } ] } diff --git a/conf/sds/rules/user_rules.json b/conf/sds/rules/user_rules.json index 2d78b9f0e..ea1a7fee8 100644 --- a/conf/sds/rules/user_rules.json +++ b/conf/sds/rules/user_rules.json @@ -111,6 +111,48 @@ "username": "hysdsops", "workflow": "hysds-io-SCIFLO_L3_DSWx_HLS:__TAG__", "job_spec": "job-SCIFLO_L3_DSWx_HLS:__TAG__" + }, + { + "enabled": true, + "job_type": "hysds-io-rtc_for_dist_query_dist_on_pub:__TAG__", + "kwargs": "{}", + "passthru_query": false, + "priority": 0, + "query_all": false, + "query_string": "{\"bool\": {\"must\": [{\"term\": {\"dataset_type.keyword\": \"L3_DIST_S1\"}}], \"must_not\": [{\"term\": {\"metadata.restaged\": \"true\"}}]}}", + "queue": "opera-job_worker-dist_s1_hist_on_publication", + "rule_name": "trigger-rtc_for_dist_query_dist_on_pub", + "username": "hysdsops", + "workflow": "hysds-io-rtc_for_dist_query_dist_on_pub:__TAG__", + "job_spec": "job-rtc_for_dist_query_dist_on_pub:__TAG__" + }, + { + "enabled": true, + "job_type": "hysds-io-rtc_for_dist_query_sc_on_first:__TAG__", + "kwargs": "{}", + "passthru_query": false, + "priority": 0, + "query_all": false, + "query_string": "{\"bool\": {\"must\": [{\"term\": {\"dataset_type.keyword\": \"DIST_S1-STATE-CONFIG\"}}, {\"term\": {\"metadata.first\": true}}],\"must_not\": [{\"term\": {\"metadata.is_complete\": true}}]}}", + "queue": "opera-job_worker-dist_s1_hist_on_first", + "rule_name": "trigger-rtc_for_dist_query_sc_on_first", + "username": "hysdsops", + "workflow": "hysds-io-rtc_for_dist_query_sc_on_first:__TAG__", + "job_spec": "job-rtc_for_dist_query_sc_on_first:__TAG__" + }, + { + "enabled": true, + "job_type": "hysds-io-rtc_for_dist_query_sc_on_complete:__TAG__", + "kwargs": "{}", + "passthru_query": false, + "priority": 0, + "query_all": false, + "query_string": "{\"bool\": {\"must\": [{\"term\": {\"dataset_type.keyword\": \"DIST_S1-STATE-CONFIG\"}}, {\"term\": {\"metadata.is_complete\": true}}]}}", + "queue": "opera-job_worker-dist_s1_hist_on_complete", + "rule_name": "trigger-rtc_for_dist_query_sc_on_complete", + "username": "hysdsops", + "workflow": "hysds-io-rtc_for_dist_query_sc_on_complete:__TAG__", + "job_spec": "job-rtc_for_dist_query_sc_on_complete:__TAG__" } ], "mozart": [] diff --git a/data_subscriber/cslc_utils.py b/data_subscriber/cslc_utils.py index bd7996a55..24342e232 100644 --- a/data_subscriber/cslc_utils.py +++ b/data_subscriber/cslc_utils.py @@ -242,17 +242,21 @@ def process_frame_geo_json(file): return frame_geo_map def parse_r2_product_file_name(native_id, product_type): + match_product_id = _datasets_json_match(product_type, native_id) + burst_id = match_product_id.group("burst_id") # e.g. T074-157286-IW3 (for RTC and CSLC) + acquisition_dts = match_product_id.group("acquisition_ts") # e.g. 20210705T183117Z + return burst_id, acquisition_dts +# TODO chrisjrd: move to dataset_util.py or similar +def _datasets_json_match(product_type, native_id): dataset_json = datasets_json_util.DatasetsJson() cslc_granule_regex = dataset_json.get(product_type)["match_pattern"] match_product_id = re.match(cslc_granule_regex, native_id) if not match_product_id: raise ValueError(f"{product_type} native ID {native_id} could not be parsed with regex from datasets.json") + return match_product_id - burst_id = match_product_id.group("burst_id") # e.g. T074-157286-IW3 (for RTC and CSLC) - acquisition_dts = match_product_id.group("acquisition_ts") # e.g. 20210705T183117Z - return burst_id, acquisition_dts def parse_cslc_file_name(native_id): return parse_r2_product_file_name(native_id, "L2_CSLC_S1") diff --git a/data_subscriber/daac_data_subscriber.py b/data_subscriber/daac_data_subscriber.py index 7979b27f9..31c040810 100755 --- a/data_subscriber/daac_data_subscriber.py +++ b/data_subscriber/daac_data_subscriber.py @@ -56,7 +56,7 @@ def run(argv: list[str]): es_conn = supply_es_conn(args) - logger.debug(f"daac_data_subscriber.py invoked with {args=}") + logger.info(f"daac_data_subscriber.py invoked with {args=}") job_id = supply_job_id() logger.debug(f"Using {job_id=}") diff --git a/data_subscriber/dist_s1_utils.py b/data_subscriber/dist_s1_utils.py index ab287eb4f..6a3aaeec8 100644 --- a/data_subscriber/dist_s1_utils.py +++ b/data_subscriber/dist_s1_utils.py @@ -87,21 +87,21 @@ def process_dist_burst_db(file): logger.info(f"Processing {df.shape[0]} rows in the DIST-S1 burst database file...") # Create a dictionary of tile ids and the products that are associated with them - for index, row in df.iterrows(): + for row in df.itertuples(): #print(row['mgrs_tile_id'], row['acq_group_id_within_mgrs_tile']) - tile_id = row['mgrs_tile_id'] - unique_acquisition = row['acq_group_id_within_mgrs_tile'] + tile_id = row.mgrs_tile_id + unique_acquisition = row.acq_group_id_within_mgrs_tile product_id = tile_id + "_" + str(unique_acquisition) if product_id not in dist_products[tile_id]: dist_products[tile_id].add(product_id) - jpl_burst_id = row['jpl_burst_id'] + jpl_burst_id = row.jpl_burst_id bursts_to_products[jpl_burst_id].add(product_id) product_to_bursts[product_id].add(jpl_burst_id) if jpl_burst_id in all_burst_ids: rtc_bursts_reused += 1 - all_burst_ids.add(row['jpl_burst_id']) + all_burst_ids.add(row.jpl_burst_id) print(f"Total of {len(all_burst_ids)} unique RTC bursts in this database file.") print(f"RTC Bursts were reused {rtc_bursts_reused} times in this database file.") diff --git a/data_subscriber/rtc_for_dist/rtc_for_dist_query.py b/data_subscriber/rtc_for_dist/rtc_for_dist_query.py index 86757eea2..a1941537b 100644 --- a/data_subscriber/rtc_for_dist/rtc_for_dist_query.py +++ b/data_subscriber/rtc_for_dist/rtc_for_dist_query.py @@ -3,16 +3,17 @@ import json import operator import re +import sys from collections import Counter, defaultdict from copy import deepcopy from datetime import datetime, timedelta from itertools import chain from os.path import basename -from typing import Union +from typing import Union, Literal import dateutil -from dateutil.parser import isoparse -from more_itertools import one, only, first +import opensearchpy +from more_itertools import one, first from data_subscriber.cmr import CMR_TIME_FORMAT, async_query_cmr from data_subscriber.cslc_utils import save_blocked_download_job, parse_r2_product_file_name @@ -23,8 +24,11 @@ from data_subscriber.es_conn_util import get_document_timestamp_min_max from data_subscriber.query import BaseQuery, DateTimeRange from data_subscriber.rtc_for_dist.dist_dependency import DistDependency, CMR_RTC_CACHE_INDEX +from dist_s1.dataset_util import create_dataset, create_ds_dataset_json, write_ds_dataset_json, write_ds_met_json +from opera_commons.es_connection import get_grq_es from rtc_utils import rtc_granule_regex, dedupe_rtc, rtc_product_file_regex from tools.populate_cmr_rtc_cache import populate_cmr_rtc_cache, parse_rtc_granule_metadata +from util.grq_client import get_body, get_range from util.job_submitter import try_submit_mozart_job EARLIEST_POSSIBLE_RTC_DATE = "2016-01-01T00:00:00Z" @@ -37,9 +41,9 @@ def __init__(self, args, token, es_conn, cmr, job_id, settings, dist_s1_burst_db super().__init__(args, token, es_conn, cmr, job_id, settings) if dist_s1_burst_db_file: - self.dist_products, self.bursts_to_products, self.product_to_bursts, self.all_tile_ids = process_dist_burst_db(dist_s1_burst_db_file) + self.dist_products, self.bursts_to_products, self.product_to_bursts, _ = process_dist_burst_db(dist_s1_burst_db_file) else: - self.dist_products, self.bursts_to_products, self.product_to_bursts, self.all_tile_ids = localize_dist_burst_db() + self.dist_products, self.bursts_to_products, self.product_to_bursts, _ = localize_dist_burst_db() # self.grace_mins = args.grace_mins if args.grace_mins else settings["DIST_S1_TRIGGERING"]["DEFAULT_DIST_S1_QUERY_GRACE_PERIOD_MINUTES"] self.grace_mins = args.grace_mins if args.grace_mins is not None else settings["DIST_S1_TRIGGERING"]["DEFAULT_DIST_S1_QUERY_GRACE_PERIOD_MINUTES"] @@ -47,19 +51,14 @@ def __init__(self, args, token, es_conn, cmr, job_id, settings, dist_s1_burst_db self.dist_dependency = DistDependency(self.logger, self.dist_products, self.bursts_to_products, self.product_to_bursts, settings) + self.batch_id_to_current_granules = {} '''This map is set by determine_download_granules and consumed by download_job_submission_handler We're taking this indirect approach instead of just passing this through to work w the current class structure''' - self.batch_id_to_current_granules = {} self.download_batch_id_to_k_granules = {} - self.settings = settings - self.force_product_id = None - self.window_delta_days = args.window_delta if args.window_delta else settings["DIST_S1_TRIGGERING"]["DEFAULT_DIST_S1_WINDOW_DELTA_DAYS"] - self.forced_product_id_to_current_granules = {} - self.download_batch_id_to_job_submittable = {} def validate_args(self): @@ -81,7 +80,9 @@ def unique_latest_granules(self, granules): return list(granules_dict.values()) def query_cmr(self, timerange, now: datetime): - if self.args.proc_mode == "forward": + self.logger.info(f"{self.args.proc_mode=}") + self.logger.info(f"{self.args.product_id_time=}") + if self.args.proc_mode == "forward" or (self.args.proc_mode == "historical" and not self.args.product_id_time): # "Normal" query for granules granules = super().query_cmr(timerange, now) @@ -134,7 +135,7 @@ def query_cmr(self, timerange, now: datetime): else: self.logger.warning(f"Not inserting granules into cmr_rtc_cache because use_temporal is True") - elif self.args.proc_mode == "reprocessing": + elif self.args.proc_mode == "reprocessing" or (self.args.proc_mode == "historical" and self.args.product_id_time): granules = [] #TODO: We can switch over to this code if we want to trigger reprocessing by RTC granule_id @@ -145,16 +146,16 @@ def query_cmr(self, timerange, now: datetime): self.logger.info(f"Reprocessing burst_id {burst_id} with product_ids {product_ids}")''' #TODO: We probably want something more graceful than the product_id_time looking like 31SGR_3,20231217T053132Z - product_ids = [self.args.product_id_time.split(",")[0]] - acquisition_dts = self.args.product_id_time.split(",")[1] + # TODO: The fact that this is a loop makes sense if we ever decide to trigger by native_id instead of product_id_time + for pit in self.args.product_id_time.split("-"): + product_id = pit.split(",")[0] + acquisition_dts = pit.split(",")[1] - acquisition_time = datetime.strptime(acquisition_dts, "%Y%m%dT%H%M%SZ") - start_time = (acquisition_time - timedelta(minutes=10)).strftime(CMR_TIME_FORMAT) - end_time = (acquisition_time + timedelta(minutes=10)).strftime(CMR_TIME_FORMAT) - query_timerange = DateTimeRange(start_time, end_time) + acquisition_time = datetime.strptime(acquisition_dts, "%Y%m%dT%H%M%SZ") + start_time = (acquisition_time - timedelta(minutes=10)).strftime(CMR_TIME_FORMAT) + end_time = (acquisition_time + timedelta(minutes=10)).strftime(CMR_TIME_FORMAT) + query_timerange = DateTimeRange(start_time, end_time) - # TODO: The fact that this is a loop makes sense if we ever decide to trigger by native_id instead of product_id_time - for product_id in product_ids: self.force_product_id = product_id #TODO: This needs to change if we change this code back to using granule_id instead of product_id new_args = deepcopy(self.args) new_args.use_temporal = True @@ -168,10 +169,6 @@ def query_cmr(self, timerange, now: datetime): g["product_id"] = product_id # force product_id because one granule can belong to multiple products granules.extend(gs) - elif self.args.proc_mode == "historical": - self.logger.error("Historical processing mode is not supported for RTC for DIST products.") - granules = [] - # Remove granules whose burst_id is not in the burst database filtered_granules = [] for granule in granules: @@ -212,7 +209,7 @@ def determine_download_granules(self, granules): rtc_granule_dict_add(granules_dict, granules) # Get unsubmitted granules, which are forward-processing ES records without download_job_id fields - if not self.args.product_id_time: + if self.args.proc_mode != "historical" or not self.args.product_id_time: self.refresh_index() unsubmitted = self.es_conn.get_unsubmitted_granules() self.logger.info("len(unsubmitted)=%d", len(unsubmitted)) @@ -232,7 +229,131 @@ def determine_download_granules(self, granules): granules_to_download.append(granules_dict[(unique_rtc_id, batch_id)]) self.batch_id_to_current_granules = batch_id_to_current_granules + if self.args.proc_mode == "historical" and not self.args.product_id_time: + # if self.args.proc_mode == "forward" or self.args.product_id_time: + # DRAFT STATE-CONFIG LOCALLY + + def search(grq_es, body): + try: + results = grq_es.search(body=body, index="grq_1.0_dist_s1-state-config") + except opensearchpy.exceptions.NotFoundError as e: + return [] + return results["hits"]["hits"] + def state_configs_by_tile( + tile_id, + start_dt_iso="1970-01-01", end_dt_iso="9999-12-31T23:59:59.999", + gt: Literal["gt", "gte"] = "gte", lt: Literal["lt", "gte"] = "lte", + order: Literal["asc", "desc"] = "asc" + ): + grq_es = get_grq_es() + body = get_body(match_all=False) + body["query"]["bool"]["must"].append({"match": {"metadata.tile_id": tile_id}}) + body["query"]["bool"]["must"].append(get_range(datetime_fieldname="acquisition_ts", gt=gt, start_dt_iso=start_dt_iso, lt=lt, end_dt_iso=end_dt_iso)) + body["sort"] = [{"@timestamp": {"order": order}}] + return search(grq_es, body) + def exists_state_config(batch_id): + return not not state_configs_by_batch_id(batch_id) + def state_configs_by_batch_id(batch_id): + grq_es = get_grq_es() + body = get_body(match_all=False) + body["query"]["bool"]["must"].append({"match": {"metadata.batch_id": batch_id}}) + return search(grq_es, body) + + product_id_time_to_state_config_ds_met_json = {} + product_id_time_to_batch_id = {} + tile_to_product_id_times = defaultdict(set) + def acq_time_from_product_id_time(p): + _, acquisition_dts = p.split(",") + return acquisition_dts + for batch_id, batch_granules in batch_id_to_current_granules.items(): + burst_id, acquisition_dts = parse_r2_product_file_name(batch_granules[0]["granule_id"], "L2_RTC_S1") + products = self.bursts_to_products[burst_id] + self.logger.error(f"{len(products)=}") + + # collect all product-id-times associated with this burst/batch (local) + batch_id_tile_id = batch_id.split("_")[0].removeprefix("p") + product_id_times = set() + for product in products: + product_tile_id = product.split("_")[0] + if product_tile_id != batch_id_tile_id: + continue + product_id_time = f"{product},{acquisition_dts}" + product_id_times.add(product_id_time) + product_id_times = sorted(product_id_times, key=acq_time_from_product_id_time) + + # collect all product-id-times in this historical timerange (global) + for product_id_time in product_id_times: + product_id_time_to_batch_id[product_id_time] = batch_id + + # group all product-id-times by tile + for product_id_time in product_id_times: + tile_id = product_id_time.split(",")[0].split("_")[0] + tile_to_product_id_times[tile_id].add(product_id_time) + + tile_to_product_id_times = dict(tile_to_product_id_times) + for k in tile_to_product_id_times: + tile_to_product_id_times[k] = sorted(tile_to_product_id_times[k], key=acq_time_from_product_id_time) + + for tile_id, product_id_times in tile_to_product_id_times.items(): + # draft state-config jsons + if len(product_id_times) == 1: + product_id_times_pairwise = zip(product_id_times, [None]) + else: + product_id_times_pairwise = zip(product_id_times, product_id_times[1:] + [None]) + + first_batch_id = product_id_time_to_batch_id[first(product_id_times)] + + for product_id_time, next_product_id_time in product_id_times_pairwise: + batch_id = product_id_time_to_batch_id[product_id_time] + dataset_id = f"DIST_S1_state-config_{batch_id}" + product, acquisition_dts = product_id_time.split(",") + tile_id = product.split("_")[0] + product_id_time_to_state_config_ds_met_json[product_id_time] = { + "id": dataset_id, + "batch_id": batch_id, + "status": "queued", + "product_id_time": product_id_time, + "next_product_id_time": next_product_id_time or "NULL", + "product_id": product, + "tile_id": tile_id, + "acquisition_ts": acquisition_dts, + "first": batch_id == first_batch_id, + } + + self.logger.info(f"{product_id_time_to_state_config_ds_met_json=}") + self.logger.info(f"{product_id_time_to_batch_id=}") + self.logger.info(f"{tile_to_product_id_times=}") + + # write out all state-configs to "queue" them + for _, ds_met_json in product_id_time_to_state_config_ds_met_json.items(): + # create state-config + dataset_id = ds_met_json["id"] + batch_id = ds_met_json["batch_id"] + + ds_dataset_json = create_ds_dataset_json(version="1.0") + ds_dataset_json_path = write_ds_dataset_json(ds_dataset_json, dataset_id) + ds_met_json_path = write_ds_met_json(ds_met_json, dataset_id) + dataset_dir = create_dataset(dataset_id=dataset_id, ds_dataset_json=ds_dataset_json_path, ds_met_json=ds_met_json_path, dataset_type="DIST_S1-STATE-CONFIG") + + # sort within groups chronologically to establish the chain + tile_to_product_id_times = dict(tile_to_product_id_times) + for t in tile_to_product_id_times: + tile_to_product_id_times[t] = sorted(tile_to_product_id_times[t], key=acq_time_from_product_id_time) + + # gather the first batch in each chain, to allow processing to continue + first_time_batch_id_to_current_granules = {} + for _, pits in tile_to_product_id_times.items(): + first_batch_id = product_id_time_to_batch_id[first(pits)] + first_time_batch_id_to_current_granules[first_batch_id] = batch_id_to_current_granules[first_batch_id] + + self.logger.info("HISTORICAL MODE (SUBMISSION). Only processing first-time products.") + batch_id_to_current_granules = first_time_batch_id_to_current_granules + + self.logger.info("Exiting early to start historical mode processing chains.") + sys.exit(0) + batch_id_to_current_granules_count = {} + self.logger.error(f"{len(batch_id_to_current_granules)=}") for k in batch_id_to_current_granules: batch_id_to_current_granules_count[k] = len(batch_id_to_current_granules[k]) self.logger.info(f"{batch_id_to_current_granules_count=}") @@ -589,8 +710,6 @@ def add_filtered_urls(granule, filtered_urls: list, polarization_preference: Uni continue product_metadata["baseline_s3_paths"] = sorted(batch_id_to_baseline_urls[batch_id]) - product_type = "rtc_for_dist" - job_name = f"job-WF-{product_type}_download-{chunk_batch_ids[0]}" # If the previous run for this tile has not been processed, submit as a pending job # previous_tile_product_file_paths can be None or a list of file paths @@ -607,6 +726,8 @@ def add_filtered_urls(granule, filtered_urls: list, polarization_preference: Uni add_attributes = {"previous_tile_job_id": previous_tile_job_id, "download_batch_id": batch_id, "acquisition_ts": acquisition_ts} + product_type = "rtc_for_dist" + job_name = f"job-WF-{product_type}_download-{chunk_batch_ids[0]}" if should_wait: self.logger.info( f"We will wait for the previous run for the job {previous_tile_job_id} to complete before submitting the download job.") @@ -622,7 +743,7 @@ def add_filtered_urls(granule, filtered_urls: list, polarization_preference: Uni rule_name=f"trigger-{product_type}_download", job_spec=f"job-{product_type}_download:{self.settings['RELEASE_VERSION']}", job_type=f"{product_type}_download", - job_name=f"job-WF-{product_type}_download-{chunk_batch_ids[0]}") + job_name=job_name) # Record download job id in ES self.es_conn.mark_download_job_id(batch_id, download_job_id) diff --git a/data_subscriber/rtc_for_dist/rtc_for_dist_query.sh b/data_subscriber/rtc_for_dist/rtc_for_dist_query.sh index d02ba66dd..53f356eb8 100755 --- a/data_subscriber/rtc_for_dist/rtc_for_dist_query.sh +++ b/data_subscriber/rtc_for_dist/rtc_for_dist_query.sh @@ -19,7 +19,7 @@ echo "##########################################" echo "Running job to query ASFDAAC RTC data for DIST-S1 production" date -python $OPERA_HOME/data_subscriber/daac_data_subscriber.py query --collection-shortname=OPERA_L2_RTC-S1_V1 --product=DIST_S1 $* > run_job.log 2>&1 +python $OPERA_HOME/data_subscriber/daac_data_subscriber.py query --collection-shortname=OPERA_L2_RTC-S1_V1 --product=DIST_S1 ${*} > run_job.log 2>&1 if [ $? -eq 0 ]; then echo "Finished running job" diff --git a/dist_s1/__init__.py b/dist_s1/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dist_s1/dataset_util.py b/dist_s1/dataset_util.py new file mode 100644 index 000000000..9ab7d8496 --- /dev/null +++ b/dist_s1/dataset_util.py @@ -0,0 +1,118 @@ +import json +import logging +import os +import re +from os import PathLike +from pathlib import PurePath, Path +from shutil import move +from typing import Union + +from util.datasets_json_util import DatasetsJson +from util.job_util import is_running_outside_verdi_worker_context + +logger = logging.getLogger(__name__) + + +def create_dataset(dataset_id: str, ds_dataset_json: Union[str, PathLike[str]], ds_met_json: Union[str, PathLike[str]] = None, dataset_type: str = None): + """ + Creates a dataset, including creating the necessary directory and listing. + :return: the path of the directory representing the new dataset. The name of the directory is prefixed by the dataset ID. + """ + validate_dataset_type(dataset_type) + validate_dataset_id(dataset_id, dataset_type) + return _create_dataset_dir(dataset_id, ds_dataset_json, ds_met_json=ds_met_json, dataset_type=dataset_type) + + +def _create_dataset_dir(dataset_id: str, ds_dataset_json: Union[str, PathLike[str]], ds_met_json: Union[str, PathLike[str]] = None, dataset_type: str = None): + """ + Creates the dataset directory, moving the specified input files into it. + :return: the dataset directory path. + """ + make_dataset_dir(dataset_id, dataset_type) + move(ds_dataset_json, PurePath(dataset_id) / f"{dataset_id}.dataset.json") + if ds_met_json: + move(ds_met_json, PurePath(dataset_id) / f"{dataset_id}.met.json") + return Path(dataset_id).resolve() + + +def make_dataset_dir(dataset_id, dataset_type: str = None): + validate_dataset_id(dataset_id, dataset_type) + os.makedirs(dataset_id, exist_ok=True) + return Path(dataset_id).resolve() + + +def validate_dataset_type(dataset_type): + if is_running_outside_verdi_worker_context(): + datasets_json = DatasetsJson() + else: + datasets_json = DatasetsJson(file="datasets.json") + try: + datasets_json.get(dataset_type) + except KeyError as e: + raise Exception(f"Invalid {dataset_type=}. Compare against datasets.json") from e + + +def is_valid_dataset_type(dataset_type = None): + if is_running_outside_verdi_worker_context(): + datasets_json = DatasetsJson() + else: + datasets_json = DatasetsJson(file="datasets.json") + try: + datasets_json.get(dataset_type) + return True + except KeyError: + return False + + +def validate_dataset_id(dataset_id, dataset_type=None): + validate_dataset_type(dataset_type) + + datasets_json = DatasetsJson(file="datasets.json") + dataset_regex = datasets_json.get(dataset_type)["match_pattern"] + dataset_regex = dataset_regex.removeprefix("/") + match = re.fullmatch(dataset_regex, dataset_id) + # match = re.match(dataset_regex, dataset_id) + if not match: + raise ValueError(f"{dataset_type=} {dataset_id=} not a valid dataset. Compare against datasets.json") + + +def create_ds_dataset_json(version=None, label=None, location=None, starttime=None, endtime=None): + """ + Create the JSON contents of a dataset's dataset.json file, typically named `.dataset.json` + :param version: the dataset's version. Typically, a string-encoded float, with an optional "v" prefix. E.g. "1.0" or "v1.0" + :poram label: Tosca label + :param location: GeoJSON geometry -formated JSON. + :param starttime: Used in Tosca table view. + :param endtime: Used in Tosca table view. + """ + ds_dataset_dict = {} + req_fields = { + **({"version": version} if version else {}) + } + ds_dataset_dict.update(req_fields) + + opt_fields = { + **({"label": label} if label else {}), + **({"location": location} if location else {}), + **({"starttime": starttime} if starttime else {}), + **({"endtime":endtime} if endtime else {}) + } + ds_dataset_dict.update(opt_fields) + + return ds_dataset_dict + + +def write_ds_met_json(dataset_metadata, dataset_id: str, output_dir: str = None) -> Path: + output_dir = output_dir if output_dir else "." + ds_met_json_path = Path(output_dir, f"{dataset_id}.met.json").resolve() + with ds_met_json_path.open("w") as fp: + json.dump(dataset_metadata, fp) + return ds_met_json_path + + +def write_ds_dataset_json(dataset_metadata: dict, dataset_id: str, output_dir: str = None) -> Path: + output_dir = output_dir if output_dir else "." + ds_dataset_json_path = Path(output_dir, f"{dataset_id}.dataset.json").resolve() + with ds_dataset_json_path.open("w") as fp: + json.dump(dataset_metadata, fp) + return ds_dataset_json_path diff --git a/dist_s1/state_config_producer.py b/dist_s1/state_config_producer.py new file mode 100644 index 000000000..bd618548d --- /dev/null +++ b/dist_s1/state_config_producer.py @@ -0,0 +1,305 @@ +"""This script handles the DIST-S1 state config as part of DIST-S1 historical processing""" + +import argparse +import base64 +import json +import sys +import tempfile +from datetime import datetime +from functools import partial +from pathlib import Path + +import opensearchpy +from more_itertools import one, only + +from dist_s1.dataset_util import (create_dataset, create_ds_dataset_json, write_ds_dataset_json, write_ds_met_json) +from opera_commons.es_connection import get_grq_es +from opera_commons.logger import get_logger, configure_library_loggers +from util.conf_util import SettingsConf +from util.ctx_util import JobContext +from util.exec_util import exec_wrapper +from util.grq_client import get_body +from util.job_submitter import try_submit_mozart_job +from util.job_util import supply_job_id +from util.pge_util import get_product_metadata + +logger = None +args = None + +to_json = partial(json.dumps, indent=2) +"""json.dumps with default params""" + +@exec_wrapper +def main(): + global logger + global args + + parser = create_arg_parser() + args = parser.parse_args(sys.argv[1:]) + logger = init_opera_pcm_logger() + logger.info(f"{__file__} invoked with {sys.argv=}") + logger.info(f"{args=}") + + run() + +def run(): + logger.info("BEGIN") + if args.producer: + on_dist_s1_publish() + elif args.consumer: + on_state_config_publish() + logger.info("END") + +def on_dist_s1_publish(): + job_id = supply_job_id() + settings = SettingsConf().cfg + + # state config steps + # 1. Note the tile ID and acquisition group (number) from the produced DIST-S1 product + # 2. Create a state-config product that includes that tile ID and acq group to mark the next one as ready to produce + + # 1 + logger.info("Loading job context") + if args.context_file: + logger.info("Custom _context.json provided.") + jc = JobContext(str(args.context_file)) + elif args.b64_context: + logger.info("Custom _context.json contents provided.") + tp = tempfile.NamedTemporaryFile() + tp.write(base64.b64decode(args.b64_context).decode("utf-8")) + tp.flush() + jc = JobContext(tp) + else: + jc = JobContext(str(Path("_context.json").absolute())) + context = context_dict = job_context = jc.ctx + logger.info(f"job_context={to_json(context_dict)}") + + # work_dir = workdir = str(args.workdir) + work_dir = str(Path("_job.json").absolute().parent) + logger.info(f"Preparing Working Directory: {work_dir}") + logger.info(f"{list(Path(work_dir).iterdir())=}") + + logger.info("Reading product_metadata") + if args.b64_product_metadata: + logger.info("Custom product_metadata provided.") + product_metadata = base64.b64decode(args.b64_product_metadata).decode("utf-8") + elif product_metadata_override := context_dict.get("product_metadata_override"): + logger.info("Using product_metadata override from _context.json.") + product_metadata = product_metadata_override + else: + product_metadata = get_product_metadata(context_dict) + logger.info(f"{product_metadata=}") + source_product_metadata = product_metadata + + # 2. Create state-config product + logger.info("Creating state-config update metadata") + if output_state_config_override := context.get("output_state_config_override"): + logger.info("Using provided output state-config") + target_product_metadata = output_state_config_override + assert target_product_metadata["batch_id"], "User error. Please supply batch_id in the override." + target_product_metadata["is_complete"] = True # only support this operation + else: + target_product_metadata = { + "version": "test", + "is_complete": True, + # "batch_id": source_product_metadata["input_granule_id"], + "batch_id": source_product_metadata["accountability"]["L3_DIST_S1"]["trigger_dataset_id"], + # "mgrs_tile_id": source_product_metadata["mgrs_tile_id"], + "input_granule_id": source_product_metadata["input_granule_id"], # "p12TYQ_3_S1A_a369" + "mgrs_tile_id": source_product_metadata["input_granule_id"].split("_")[0].removeprefix("p"), + "acquisition_group": source_product_metadata["input_granule_id"].split("_")[1], + "instrument": source_product_metadata["input_granule_id"].split("_")[2], + "acquisition_cycle_index": source_product_metadata["input_granule_id"].split("_")[3].removeprefix("a"), # get suffix + "dist_s1_id": source_product_metadata["id"], + } + logger.info(f"{target_product_metadata=}") + + # output_dir = str(PurePath(work_dir) / job_param_by_name(context, "pge_output_dir")) + batch_id = source_product_metadata["input_granule_id"] # derive from source product (DIST-S1) + batch_id = batch_id.removeprefix("p") + batch_id = batch_id.replace("_a", "_") + target_product_metadata["batch_id"] = batch_id + + state_config_metadata_existing = only(state_configs_by_batch_id(batch_id=batch_id), default={}).get("_source", {}).get("metadata", {}) + state_config_metadata_to_update = {} + state_config_metadata_to_update.update(state_config_metadata_existing) + state_config_metadata_to_update.update(target_product_metadata) + state_config_metadata_to_update.update({"batch_id": batch_id, "status": "complete", "is_complete": True, "random": datetime.now().isoformat(timespec="seconds")}) + + target_product_metadata = state_config_metadata_to_update + + # create "current" state-config dataset + logger.info(f"Creating state-config files locally for post-job publishing") + dataset_id = f"DIST_S1_state-config_{batch_id}" + ds_dataset_json = create_ds_dataset_json(version="1.0") + ds_dataset_json_path = write_ds_dataset_json(ds_dataset_json, dataset_id) + ds_met_json_path = write_ds_met_json(target_product_metadata, dataset_id) + dataset_dir = create_dataset(dataset_id=dataset_id, ds_dataset_json=ds_dataset_json_path, ds_met_json=ds_met_json_path, dataset_type="DIST_S1-STATE-CONFIG") + logger.info(f"Created state-config files locally for post-job publishing. {dataset_dir=}") + + logger.info(f"{list(Path(work_dir).iterdir())=}") + return + + +def on_state_config_publish(): + # construct `--product-id-time` param using state-config + # 1. parse state-config + # 2. submit rtc_for_dist job with `--product-id-time` + + job_id = supply_job_id() + settings = SettingsConf().cfg + + logger.info("Loading job context") + if args.context_file: + logger.info("Custom _context.json provided.") + jc = JobContext(str(args.context_file)) + elif args.b64_context: + logger.info("Custom _context.json contents provided.") + tp = tempfile.NamedTemporaryFile() + tp.write(base64.b64decode(args.b64_context).decode("utf-8")) + tp.flush() + jc = JobContext(tp) + else: + jc = JobContext(str(Path("_context.json").absolute())) + context = context_dict = job_context = jc.ctx + logger.info(f"job_context={to_json(context_dict)}") + + # work_dir = workdir = str(args.workdir) + work_dir = str(Path("_job.json").absolute().parent) + logger.info(f"Preparing Working Directory: {work_dir}") + logger.info(f"{list(Path(work_dir).iterdir())=}") + + logger.info("Reading product_metadata") + if args.b64_product_metadata: + logger.info("Custom product_metadata provided.") + state_config_metadata = product_metadata = base64.b64decode(args.b64_product_metadata).decode("utf-8") + elif args.use_sample_product_metadata: + logger.info("Using sample product_metadata.") + state_config_metadata = product_metadata = create_sample_state_config() + if product_metadata_override := context_dict.get("product_metadata_override"): + logger.info("Using product_metadata override from _context.json.") + state_config_metadata = product_metadata = product_metadata_override + else: + state_config_metadata = product_metadata = get_product_metadata(context_dict) + + state_config_metadata_existing = state_config_metadata = product_metadata = one(state_configs_by_batch_id(batch_id=state_config_metadata["batch_id"]))["_source"]["metadata"] + logger.info(f"{product_metadata=}") + + if not state_config_metadata.get("next_product_id_time") or state_config_metadata["next_product_id_time"] == "NULL": + logger.info("No next_product_id_time. Reached end of chain. Nothing further to do.") + logger.info("EXITING.") + return + + # 1. + product_type = "rtc_for_dist" + if state_config_metadata.get("first") and state_config_metadata["first"] != "NULL": + product_id_time = state_config_metadata["product_id_time"] + else: + product_id_time = state_config_metadata["next_product_id_time"] + params = [ + { + "name": "product_id_time", + "from": "value", + "type": "text", + "value": f"--product-id-time={product_id_time}" + } + ] + logger.info(f"{params=}") + query_job_id = try_submit_mozart_job(product={}, + params=params, + job_queue="opera-job_worker-rtc_for_dist_data_query_hist", + rule_name=f"trigger-{product_type}_query_hist", + job_spec=f"job-{product_type}_query_hist:{settings['RELEASE_VERSION']}", + job_type=f"{product_type}_query_hist", # stem of job-spec.json file + job_name=f"job-WF-{product_type}_query_hist-{product_id_time}") + logger.info(f"{query_job_id=}") + return + + +def state_configs_by_batch_id(batch_id): + grq_es = get_grq_es() + body = get_body(match_all=False) + body["query"]["bool"]["must"].append({"term": {"metadata.batch_id.keyword": batch_id}}) + try: + results = grq_es.search(body=body, index="grq_1.0_dist_s1-state-config") + except opensearchpy.exceptions.NotFoundError as e: + # return [] # intentionally commented out and left in for context to reader + raise e + return results["hits"]["hits"] + + +def create_sample_state_config(): + return { + "version": "test", + "is_complete": True, + "mgrs_tile_id": "12TYQ", + "status": "complete", + "batch_id": "12TYQ_3_S1A_369", + "product_id_time": "30RUU_0,20260313T061308Z", + "product_id_times": ["30RUU_0,20260313T061308Z"], + "input_granule_id": "p12TYQ_3_S1A_a369", + "acquisition_group": "3", + "instrument": "S1A", + "acquisition_cycle_index": "369" + } + + +def create_product_metadata_sample(): + return { + "id": "OPERA_L3_DIST-ALERT-S1_T12TYQ_20260225T131640Z_20260225T193352Z_S1A_30_v0.1", + "objectid": "OPERA_L3_DIST-ALERT-S1_T12TYQ_20260225T131640Z_20260225T193352Z_S1A_30_v0.1", + "dataset": "L3_DIST_S1", + "metadata": { + "id": "OPERA_L3_DIST-ALERT-S1_T12TYQ_20260225T131640Z_20260225T193352Z_S1A_30_v0.1", + "input_granule_id": "p12TYQ_3_S1A_a369", + "mgrs_tile_id": "12TYQ", + "accountability": { + "L3_DIST_S1": { + "id": "OPERA_L3_DIST-ALERT-S1_T12TYQ_20260225T131640Z_20260225T193352Z_S1A_30_v0.1", + "input_data_type": "L2_RTC_S1", + "trigger_dataset_type": "L2_RTC_S1", + "trigger_dataset_id": "p12TYQ_3_S1A_a369", + "inputs": ["OPERA_L2_RTC-S1_T129-275741-IW2_20260225T131640Z_20260225T172838Z_S1A_30_v1.0_VH.tif"], + # TODO chrisjrd: get from input file group. post copol/crosspol + } + } + }, + "ipath": "hysds::data/L3_DIST_S1", + "system_version": "v0.1", + "dataset_level": "L3", + "dataset_type": "L3_DIST_S1", + "version": "v0.1", + "creation_timestamp": "2026-02-25T19:34:09.324", + "@timestamp": "2026-02-25T19:34:45.836928Z" + } + + +def init_opera_pcm_logger(): + logger = get_logger(log_format_override="%(asctime)s %(levelname)7s %(name)13s:%(filename)19s:%(funcName)22s:%(lineno)3s - %(message)s") + configure_library_loggers() + return logger + + +def create_arg_parser(): + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) + # parser.add_argument("workdir", help="The absolute pathname of the current working directory.", type=lambda p: Path(p).absolute()) + group_producer_consumer = parser.add_mutually_exclusive_group(required=True) + group_producer_consumer.add_argument("--producer", action="store_true", help="Run in producer mode. This tool will, given a product publication, upsert a state-config document.") + group_producer_consumer.add_argument("--consumer", action="store_true", help="Run in consumer mode. This tool will, given a state-config document upsert, submit an RTC query job.") + + parser.add_argument("--smoke-run", action="store_true", help="Toggle for processing a single output.") + parser.add_argument("--dry-run", action="store_true", help="Toggle for skipping network transfers.") + + group_context = parser.add_mutually_exclusive_group() + group_context.add_argument("--context-file", dest="context_file", required=False, help="The context file in the workspace. Typically \"_context.json\".", type=lambda p: Path(p).absolute()) + group_context.add_argument("--b64-context", dest="b64_context", help="Custom _context.json contents. base64-encoded.") + + group_product_metadata = parser.add_mutually_exclusive_group() + group_product_metadata.add_argument("--b64-product-metadata", dest="b64_product_metadata", help="Custom product metadata JSON typically found in _context.json. base64-encoded.") + group_product_metadata.add_argument("--use-sample-product-metadata", dest="use_sample_product_metadata", action="store_true", help="Use hardcoded product_metadata.") + + return parser + + +if __name__ == '__main__': + main() diff --git a/dist_s1/test_state_config_producer.sh b/dist_s1/test_state_config_producer.sh new file mode 100755 index 000000000..33391b32f --- /dev/null +++ b/dist_s1/test_state_config_producer.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +echo "args: $*" + +BASE_PATH=$(dirname "${BASH_SOURCE}") +BASE_PATH=$(cd "${BASE_PATH}"; pwd) + +# source PGE env +export OPERA_HOME=/home/ops/verdi/ops/opera-pcm +export PYTHONPATH=$BASE_PATH:$OPERA_HOME:$PYTHONPATH +export PATH=$BASE_PATH:$PATH +export PYTHONDONTWRITEBYTECODE=1 +export LD_LIBRARY_PATH=/opt/conda/lib:$LD_LIBRARY_PATH + +# source environment +source $HOME/verdi/bin/activate + +echo "##########################################" +echo "Running job to test state config production" +date + +python $OPERA_HOME/dist_s1/state_config_producer.py $* > run_job.log 2>&1 + +if [ $? -eq 0 ]; then + echo "Finished running job" + date + exit 0 +else + echo "Failed to run state_config_producer.py" + date + exit 1 +fi diff --git a/docker/hysds-io.json.rtc_for_dist_query b/docker/hysds-io.json.rtc_for_dist_query index 3af026a45..71b30a505 100644 --- a/docker/hysds-io.json.rtc_for_dist_query +++ b/docker/hysds-io.json.rtc_for_dist_query @@ -42,7 +42,7 @@ "from": "submitter", "placeholder": "e.g. --job-queue=", "type": "text", - "default": "--job-queue=opera-job_worker-cslc_data_download" + "default": "--job-queue=opera-job_worker-rtc_for_dist_data_download" }, { "name": "chunk_size", @@ -55,9 +55,9 @@ { "name": "k_offsets_counts", "from": "submitter", - "placeholder": "e.g. --k-offsets-counts=[(365, 4), (730, 3), (1095,3)]", + "placeholder": "e.g. --k-offsets-counts=[(365,4),(730,3),(1095,3)]", "type": "text", - "default": "--k-offsets-counts=[(365, 4), (730, 3), (1095,3)]", + "default": "--k-offsets-counts=[(365,4),(730,3),(1095,3)]", "optional": true }, { @@ -110,7 +110,7 @@ { "name": "processing_mode", "from": "submitter", - "placeholder": "e.g. --processing-mode", + "placeholder": "e.g. --processing-mode=forward", "optional": true }, { diff --git a/docker/hysds-io.json.rtc_for_dist_query_dist_on_pub b/docker/hysds-io.json.rtc_for_dist_query_dist_on_pub new file mode 100644 index 000000000..fe7c511b3 --- /dev/null +++ b/docker/hysds-io.json.rtc_for_dist_query_dist_on_pub @@ -0,0 +1,18 @@ +{ + "label": "DIST-S1. On-publication listener", + "submission_type":"individual", + "allowed_accounts": [ "ops" ], + "params": [ + { + "name": "mode", + "from": "value", + "type": "text", + "value": "--producer" + }, + { + "name": "product_metadata", + "from": "dataset_jpath:_source", + "lambda" : "lambda ds: { 'metadata': ds['metadata'] }" + } + ] +} diff --git a/docker/hysds-io.json.rtc_for_dist_query_hist b/docker/hysds-io.json.rtc_for_dist_query_hist new file mode 100644 index 000000000..bbad91fbd --- /dev/null +++ b/docker/hysds-io.json.rtc_for_dist_query_hist @@ -0,0 +1,68 @@ +{ + "label": "Query RTC data with product-id-time for DIST-S1 triggering. Historical mode. INTERNAL USE ONLY", + "submission_type":"individual", + "allowed_accounts": [ "ops" ], + "params": [ + { + "name": "download_job_queue", + "from": "submitter", + "placeholder": "e.g. --job-queue=", + "type": "text", + "default": "--job-queue=opera-job_worker-rtc_for_dist_data_download_hist" + }, + { + "name": "chunk_size", + "from": "submitter", + "placeholder": "e.g. --chunk-size=", + "type": "text", + "default": "--chunk-size=1", + "optional": true + }, + { + "name": "k_offsets_counts", + "from": "submitter", + "placeholder": "e.g. --k-offsets-counts=[(365,4),(730,3),(1095,3)]", + "type": "text", + "default": "--k-offsets-counts=[(365,4),(730,3),(1095,3)]", + "optional": true + }, + { + "name": "smoke_run", + "from": "submitter", + "placeholder": "e.g. --smoke-run", + "optional": true + }, + { + "name": "dry_run", + "from": "submitter", + "placeholder": "e.g. --dry-run", + "optional": true + }, + { + "name": "use_temporal", + "from": "submitter", + "placeholder": "e.g. --use-temporal", + "type": "text", + "default": "--use-temporal" + }, + { + "name": "processing_mode", + "from": "submitter", + "placeholder": "e.g. --processing-mode=historical", + "type": "text", + "default": "--processing-mode=historical" + }, + { + "name": "transfer_protocol", + "from": "submitter", + "placeholder": "e.g. --transfer-protocol=auto", + "type": "text", + "default": "--transfer-protocol=auto" + }, + { + "name": "product_id_time", + "from": "dataset_jpath:_source", + "lambda" : "lambda ds: '--product-id-time='+ds['metadata']['product_id_time']" + } + ] +} diff --git a/docker/hysds-io.json.rtc_for_dist_query_sc_on_complete b/docker/hysds-io.json.rtc_for_dist_query_sc_on_complete new file mode 100644 index 000000000..812749670 --- /dev/null +++ b/docker/hysds-io.json.rtc_for_dist_query_sc_on_complete @@ -0,0 +1,18 @@ +{ + "label": "DIST-S1. On-state-config-complete listener", + "submission_type":"individual", + "allowed_accounts": [ "ops" ], + "params": [ + { + "name": "mode", + "from": "value", + "type": "text", + "value": "--consumer" + }, + { + "name": "product_metadata", + "from": "dataset_jpath:_source", + "lambda" : "lambda ds: { 'metadata': ds['metadata'] }" + } + ] +} diff --git a/docker/hysds-io.json.rtc_for_dist_query_sc_on_first b/docker/hysds-io.json.rtc_for_dist_query_sc_on_first new file mode 100644 index 000000000..db7819562 --- /dev/null +++ b/docker/hysds-io.json.rtc_for_dist_query_sc_on_first @@ -0,0 +1,18 @@ +{ + "label": "DIST-S1. On-state-config-first listener", + "submission_type":"individual", + "allowed_accounts": [ "ops" ], + "params": [ + { + "name": "mode", + "from": "value", + "type": "text", + "value": "--consumer" + }, + { + "name": "product_metadata", + "from": "dataset_jpath:_source", + "lambda" : "lambda ds: { 'metadata': ds['metadata'] }" + } + ] +} diff --git a/docker/hysds-io.json.test_state_config_production b/docker/hysds-io.json.test_state_config_production new file mode 100644 index 000000000..cc18c90a0 --- /dev/null +++ b/docker/hysds-io.json.test_state_config_production @@ -0,0 +1,46 @@ +{ + "label": "Test DIST S1 state config production", + "submission_type":"individual", + "allowed_accounts": [ "ops" ], + "params": [ + { + "name": "smoke_run", + "from": "submitter", + "placeholder": "e.g. --smoke-run", + "optional": true + }, + { + "name": "dry_run", + "from": "submitter", + "placeholder": "e.g. --dry-run", + "optional": true + }, + { + "name": "more_args", + "from": "submitter", + "placeholder": "e.g. --verbose ...", + "optional": true + }, + { + "name": "product_metadata_override", + "from": "submitter", + "type": "object", + "placeholder": "{\"foo\": \"bar\"}", + "optional": false + }, + { + "name": "output_state_config_override", + "from": "submitter", + "type": "object", + "placeholder": "{\"foo\": \"bar\"}", + "optional": false + }, + { + "name": "context_overrides", + "from": "submitter", + "type": "object", + "placeholder": "{\"foo\": \"bar\"}", + "optional": false + } + ] +} diff --git a/docker/job-spec.json.rtc_for_dist_query_dist_on_pub b/docker/job-spec.json.rtc_for_dist_query_dist_on_pub new file mode 100644 index 000000000..bd1991734 --- /dev/null +++ b/docker/job-spec.json.rtc_for_dist_query_dist_on_pub @@ -0,0 +1,23 @@ +{ + "command":"/home/ops/verdi/ops/opera-pcm/dist_s1/test_state_config_producer.sh", + "disk_usage":"1GB", + "soft_time_limit": 9939, + "time_limit": 9999, + "imported_worker_files": { + "$HOST_VERDI_HOME/.netrc": "/home/ops/.netrc", + "$HOST_VERDI_HOME/.aws": "/home/ops/.aws", + "$HOST_VERDI_HOME/verdi/etc/settings.yaml": "/home/ops/verdi/ops/opera-pcm/conf/settings.yaml" + }, + "recommended-queues": [ "opera-job_worker-dist_s1_hist_on_publication" ], + "post": [ "hysds.triage.triage" ], + "params": [ + { + "name": "mode", + "destination": "positional" + }, + { + "name": "product_metadata", + "destination": "context" + } + ] +} diff --git a/docker/job-spec.json.rtc_for_dist_query_hist b/docker/job-spec.json.rtc_for_dist_query_hist new file mode 100644 index 000000000..a82eb4cc2 --- /dev/null +++ b/docker/job-spec.json.rtc_for_dist_query_hist @@ -0,0 +1,51 @@ +{ + "command":"/home/ops/verdi/ops/opera-pcm/data_subscriber/rtc_for_dist/rtc_for_dist_query.sh", + "disk_usage":"1GB", + "soft_time_limit": 9939, + "time_limit": 9999, + "imported_worker_files": { + "$HOST_VERDI_HOME/.netrc": "/home/ops/.netrc", + "$HOST_VERDI_HOME/.aws": "/home/ops/.aws", + "$HOST_VERDI_HOME/verdi/etc/settings.yaml": "/home/ops/verdi/ops/opera-pcm/conf/settings.yaml" + }, + "recommended-queues": [ "opera-job_worker-rtc_for_dist_data_query_hist" ], + "post": [ "hysds.triage.triage" ], + "params": [ + { + "name": "download_job_queue", + "destination": "positional" + }, + { + "name": "chunk_size", + "destination": "positional" + }, + { + "name": "k_offsets_counts", + "destination": "positional" + }, + { + "name": "smoke_run", + "destination": "positional" + }, + { + "name": "dry_run", + "destination": "positional" + }, + { + "name": "use_temporal", + "destination": "positional" + }, + { + "name": "processing_mode", + "destination": "positional" + }, + { + "name": "transfer_protocol", + "destination": "positional" + }, + { + "name": "product_id_time", + "destination": "positional" + } + ] +} diff --git a/docker/job-spec.json.rtc_for_dist_query_sc_on_complete b/docker/job-spec.json.rtc_for_dist_query_sc_on_complete new file mode 100644 index 000000000..4a5fd9c06 --- /dev/null +++ b/docker/job-spec.json.rtc_for_dist_query_sc_on_complete @@ -0,0 +1,23 @@ +{ + "command":"/home/ops/verdi/ops/opera-pcm/dist_s1/test_state_config_producer.sh", + "disk_usage":"1GB", + "soft_time_limit": 9939, + "time_limit": 9999, + "imported_worker_files": { + "$HOST_VERDI_HOME/.netrc": "/home/ops/.netrc", + "$HOST_VERDI_HOME/.aws": "/home/ops/.aws", + "$HOST_VERDI_HOME/verdi/etc/settings.yaml": "/home/ops/verdi/ops/opera-pcm/conf/settings.yaml" + }, + "recommended-queues": [ "opera-job_worker-dist_s1_hist_on_complete" ], + "post": [ "hysds.triage.triage" ], + "params": [ + { + "name": "mode", + "destination": "positional" + }, + { + "name": "product_metadata", + "destination": "context" + } + ] +} diff --git a/docker/job-spec.json.rtc_for_dist_query_sc_on_first b/docker/job-spec.json.rtc_for_dist_query_sc_on_first new file mode 100644 index 000000000..22e352e89 --- /dev/null +++ b/docker/job-spec.json.rtc_for_dist_query_sc_on_first @@ -0,0 +1,23 @@ +{ + "command":"/home/ops/verdi/ops/opera-pcm/dist_s1/test_state_config_producer.sh", + "disk_usage":"1GB", + "soft_time_limit": 9939, + "time_limit": 9999, + "imported_worker_files": { + "$HOST_VERDI_HOME/.netrc": "/home/ops/.netrc", + "$HOST_VERDI_HOME/.aws": "/home/ops/.aws", + "$HOST_VERDI_HOME/verdi/etc/settings.yaml": "/home/ops/verdi/ops/opera-pcm/conf/settings.yaml" + }, + "recommended-queues": [ "opera-job_worker-dist_s1_hist_on_first" ], + "post": [ "hysds.triage.triage" ], + "params": [ + { + "name": "mode", + "destination": "positional" + }, + { + "name": "product_metadata", + "destination": "context" + } + ] +} diff --git a/docker/job-spec.json.test_state_config_production b/docker/job-spec.json.test_state_config_production new file mode 100644 index 000000000..673568cff --- /dev/null +++ b/docker/job-spec.json.test_state_config_production @@ -0,0 +1,39 @@ +{ + "command":"/home/ops/verdi/ops/opera-pcm/dist_s1/test_state_config_producer.sh", + "disk_usage":"1GB", + "soft_time_limit": 3600, + "time_limit": 660, + "imported_worker_files": { + "$HOST_VERDI_HOME/.netrc": "/home/ops/.netrc", + "$HOST_VERDI_HOME/.aws": "/home/ops/.aws", + "$HOST_VERDI_HOME/verdi/etc/settings.yaml": "/home/ops/verdi/ops/opera-pcm/conf/settings.yaml" + }, + "recommended-queues": [ "opera-job_worker-dist_s1_state_config" ], + "post": [ "hysds.triage.triage" ], + "params": [ + { + "name": "smoke_run", + "destination": "positional" + }, + { + "name": "dry_run", + "destination": "positional" + }, + { + "name": "more_args", + "destination": "positional" + }, + { + "name": "product_metadata_override", + "destination": "context" + }, + { + "name": "output_state_config_override", + "destination": "context" + }, + { + "name": "context_overrides", + "destination": "context" + } + ] +} diff --git a/extractor/extract.py b/extractor/extract.py index 2fc458e1d..687472d2a 100755 --- a/extractor/extract.py +++ b/extractor/extract.py @@ -231,7 +231,7 @@ def create_dataset_id(product, product_types): dataset_id = match.groupdict()[REGEX_ID_KEY] # Otherwise, default to using the product's filename to derive the dataset ID else: - if product_types[product_type][STRIP_FILE_EXTENSION_KEY]: + if product_types[product_type].get(STRIP_FILE_EXTENSION_KEY, False): dataset_id = os.path.splitext(os.path.basename(product))[0] else: dataset_id = os.path.basename(product) diff --git a/opera_commons/logger.py b/opera_commons/logger.py index 4ce4a9307..847393871 100644 --- a/opera_commons/logger.py +++ b/opera_commons/logger.py @@ -63,7 +63,7 @@ def filter(self, record): logger_initialized = False -def get_logger(verbose=False, quiet=False): +def get_logger(verbose=False, quiet=False, log_format_override=None): global logger_initialized if not logger_initialized: @@ -79,10 +79,13 @@ def get_logger(verbose=False, quiet=False): log_format = '[%(asctime)s: %(levelname)s/%(module)s:%(funcName)s:%(lineno)d] %(message)s' else: log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" + if log_format_override: + log_format = log_format_override logging.basicConfig(level=log_level, format=log_format, force=True) logger.addFilter(NoLogUtilsFilter()) + logger.info("Added logging filter for elasticsearch_utils/opensearch_utils") logger_initialized = True logger.info("Initial logging configuration complete") @@ -92,20 +95,22 @@ def get_logger(verbose=False, quiet=False): class NoLogUtilsFilter(logging.Filter): - """Filters out large JSON output of HySDS internals. Apply to any logger (typically __main__) or its handlers.""" + def filter(self, record): if not record.filename == "elasticsearch_utils.py": return True + if not record.filename == "opensearch_utils.py": + return True return record.funcName != "update_document" class NoJobUtilsFilter(logging.Filter): - """Filters out large JSON output of HySDS internals. Apply to the logger named "hysds_commons" or one of its handlers.""" + def filter(self, record): if not record.filename == "job_utils.py": return True diff --git a/tools/dist_s1_burst_db_tool.py b/tools/dist_s1_burst_db_tool.py index 7ca416fab..612b65139 100755 --- a/tools/dist_s1_burst_db_tool.py +++ b/tools/dist_s1_burst_db_tool.py @@ -5,13 +5,12 @@ import pickle import argparse import csv -from tqdm import tqdm import geopandas as gpd import requests from datetime import datetime, timedelta -from data_subscriber.url import determine_acquisition_cycle from data_subscriber.cslc_utils import parse_r2_product_file_name from data_subscriber.dist_s1_utils import parse_local_burst_db_pickle, localize_dist_burst_db, trigger_from_cmr_survey_csv +from rtc_utils import determine_acquisition_cycle burst_geometry_file_url = "https://github.com/opera-adt/burst_db/releases/download/v0.9.0/burst-id-geometries-simple-0.9.0.geojson.zip" burst_geometry_file = "burst-id-geometries-simple-0.9.0.geojson.zip" @@ -22,56 +21,37 @@ logging.basicConfig(level="INFO") logger = logging.getLogger(__name__) -parser = argparse.ArgumentParser() -parser.add_argument("--verbose", dest="verbose", help="If true, print out verbose information.", required=False, default=False) -parser.add_argument("--db-file", dest="db_file", help="Specify the DIST-S1 burst database parquet file \ + +def create_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("--verbose", dest="verbose", help="If true, print out verbose information.", required=False, default=False) + parser.add_argument("--db-file", dest="db_file", help="Specify the DIST-S1 burst database parquet file \ on the local file system instead of using the standard one in S3 ancillary", required=False) -parser.add_argument("--no-geometry", dest="no_geometry", action="store_true", - help="Do not print burst geometry information. This speeds up this tool significantly.", required=False, default=False) -subparsers = parser.add_subparsers(dest="subparser_name", required=True) - -server_parser = subparsers.add_parser("list", help="List all tile numbers") - -server_parser = subparsers.add_parser("summary", help="List all tile numbers, number of products and their bursts") - -server_parser = subparsers.add_parser("native_id", help="Print information based on native_id") -server_parser.add_argument("native_id", help="The RTC native id from CMR") - -server_parser = subparsers.add_parser("tile_id", help="Print information based on tile") -server_parser.add_argument("tile_id", help="The tile ID") -server_parser.add_argument("--first-product-datetime", help="Use the first product datetime to generate datetime for rest of the products in this tile", required=False, default=None) - -server_parser = subparsers.add_parser("burst_id", help="Print information based on burst id.") -server_parser.add_argument("burst_id", help="Burst id looks like T175-374393-IW1.") - -server_parser = subparsers.add_parser("trigger_granules", help="Run the list of granules through the triggering logic. Listed by increasing latest acquisition time.") -server_parser.add_argument("cmr_survey_csv", help="The cmr survey csv file") -server_parser.add_argument("--complete-tiles-only", help="Only trigger complete tiles", required=False, default=False) -server_parser.add_argument("--tile-to-trigger", help="Only trigger a specific tile. This will print out all the RTC granules used in triggering.", required=False, default=None) - -args = parser.parse_args() - -if args.db_file: - # First see if a pickle file exists - pickle_file_name = args.db_file + ".pickle" - dist_products, bursts_to_products, product_to_bursts, all_tile_ids = parse_local_burst_db_pickle(args.db_file, pickle_file_name) -else: - dist_products, bursts_to_products, product_to_bursts, all_tile_ids = localize_dist_burst_db() - -if args.no_geometry is False: - #Check to see if burst_geometry_file exists on the local filesystem - if not os.path.exists(burst_geometry_file): - print(f"Downloading burst geometry file from {burst_geometry_file_url}") - response = requests.get(burst_geometry_file_url) - response.raise_for_status() - with open(burst_geometry_file, 'wb') as f: - f.write(response.content) - else: - print(f"Using existing burst geometry file: {burst_geometry_file}") - print(f"Reading burst geometry file: {burst_geometry_file}") - burst_grid = gpd.read_file(burst_geometry_file) + parser.add_argument("--no-geometry", dest="no_geometry", action="store_true", + help="Do not print burst geometry information. This speeds up this tool significantly.", required=False, default=False) + subparsers = parser.add_subparsers(dest="subparser_name", required=True) + + server_parser = subparsers.add_parser("list", help="List all tile numbers") + + server_parser = subparsers.add_parser("summary", help="List all tile numbers, number of products and their bursts") + + server_parser = subparsers.add_parser("native_id", help="Print information based on native_id") + server_parser.add_argument("native_id", help="The RTC native id from CMR") -def get_burst_geometry(burst_id): + server_parser = subparsers.add_parser("tile_id", help="Print information based on tile") + server_parser.add_argument("tile_id", help="The tile ID") + server_parser.add_argument("--first-product-datetime", help="Use the first product datetime to generate datetime for rest of the products in this tile", required=False, default=None) + + server_parser = subparsers.add_parser("burst_id", help="Print information based on burst id.") + server_parser.add_argument("burst_id", help="Burst id looks like T175-374393-IW1.") + + server_parser = subparsers.add_parser("trigger_granules", help="Run the list of granules through the triggering logic. Listed by increasing latest acquisition time.") + server_parser.add_argument("cmr_survey_csv", help="The cmr survey csv file") + server_parser.add_argument("--complete-tiles-only", help="Only trigger complete tiles", required=False, default=False) + server_parser.add_argument("--tile-to-trigger", help="Only trigger a specific tile. This will print out all the RTC granules used in triggering.", required=False, default=None) + return parser + +def get_burst_geometry(burst_grid, burst_id): """Get the geometry of a burst given its ID.""" burst_id_converted = burst_id.lower().replace('-', '_') burst_geom = burst_grid[burst_grid['burst_id_jpl'] == burst_id_converted] @@ -80,96 +60,117 @@ def get_burst_geometry(burst_id): return None return burst_geom.geometry.iloc[0].bounds -if args.subparser_name == "list": - l = list(all_tile_ids) - print("Tile IDs (%d): \n" % len(l), l) - -elif args.subparser_name == "summary": - - # Print out the number of tiles, products, and unique bursts - print("Number of tiles: ", len(all_tile_ids)) - print("Number of products: ", len(product_to_bursts.keys())) - print("Number of unique bursts: ", len(bursts_to_products.keys())) - - # Find the tile with the most products and then print out all the products and their bursts - tile_with_most_products = max(dist_products.items(), key=lambda x: len(x[1])) - print("Tile with most products: ", tile_with_most_products[0], "with", len(tile_with_most_products[1]), "products") - print("Tile ID, Number of Products, Product IDs, Bursts") - tile_id = tile_with_most_products[0] - for product_id in sorted(list(dist_products[tile_id])): - burst_ids = sorted(list(product_to_bursts[product_id])) - print(f"{product_id} ({len(burst_ids)} bursts): {burst_ids}") - -elif args.subparser_name == "native_id": - burst_id, acquisition_dts = parse_r2_product_file_name(args.native_id, "L2_RTC_S1") - acquisition_index = determine_acquisition_cycle(burst_id, acquisition_dts, args.native_id) - products = bursts_to_products[burst_id] - - if len(products) == 0: - print("No DIST-S1 products are associated with burst id: ", burst_id) - exit(-1) - - print("Burst id: ", burst_id) - print("Acquisition datetime: ", acquisition_dts) - print("Acquisition index: ", acquisition_index) - print("Product IDs: ", products) - for product in products: - print("--product-id-time: ", f"{product},{acquisition_dts}") - if args.no_geometry is False: - print("Burst geometry minx, miny, maxx, maxy: ", get_burst_geometry(burst_id)) - -elif args.subparser_name == "tile_id": - tile_id = args.tile_id - if tile_id not in dist_products.keys(): - print("Tile ID: ", tile_id, "does not exist") - exit(-1) +def main(): + parser = create_parser() + args = parser.parse_args() - # datetime looks like this: 20250614T015042Z - if args.first_product_datetime: - first_product_datetime = datetime.strptime(args.first_product_datetime, "%Y%m%dT%H%M%SZ") + if args.db_file: + # First see if a pickle file exists + pickle_file_name = args.db_file + ".pickle" + dist_products, bursts_to_products, product_to_bursts, all_tile_ids = parse_local_burst_db_pickle(args.db_file, pickle_file_name) else: - first_product_datetime = None - - print("Tile ID: ", tile_id) - print("Product IDs and burst ids: ") - product_ids = sorted(list(dist_products[tile_id])) - first_product_first_burst_id = sorted(list(product_to_bursts[product_ids[0]]))[0] - first_burst_identification_number = int(first_product_first_burst_id.split("-")[1]) - for product_id in product_ids: - burst_ids = sorted(list(product_to_bursts[product_id])) - if first_product_datetime: - current_burst_identification_number = int(burst_ids[0].split("-")[1]) - delta_seconds = 12 * 24 * 60 * 60 * (current_burst_identification_number - first_burst_identification_number) / 375887 - product_datetime = first_product_datetime + timedelta(seconds=delta_seconds) - product_datetime_str = product_datetime.strftime("%Y%m%dT%H%M%SZ") - print(f"{product_id} ({product_datetime_str}) ({len(burst_ids)} bursts): {burst_ids}") - else: - print(f"{product_id} ({len(burst_ids)} bursts): {burst_ids}") + dist_products, bursts_to_products, product_to_bursts, all_tile_ids = localize_dist_burst_db() -elif args.subparser_name == "burst_id": - burst_id = args.burst_id - if burst_id not in bursts_to_products.keys(): - print("Burst id: ", burst_id, "is not associated with any products") - exit(-1) - - print("Burst id: ", burst_id) - product_ids = bursts_to_products[burst_id] - print("Product IDs: ({len(product_ids))", product_ids) if args.no_geometry is False: - print("Burst geometry minx, miny, maxx, maxy: ", get_burst_geometry(burst_id)) + #Check to see if burst_geometry_file exists on the local filesystem + if not os.path.exists(burst_geometry_file): + print(f"Downloading burst geometry file from {burst_geometry_file_url}") + response = requests.get(burst_geometry_file_url) + response.raise_for_status() + with open(burst_geometry_file, 'wb') as f: + f.write(response.content) + else: + print(f"Using existing burst geometry file: {burst_geometry_file}") + print(f"Reading burst geometry file: {burst_geometry_file}") + burst_grid = gpd.read_file(burst_geometry_file) + + if args.subparser_name == "list": + l = list(all_tile_ids) + print("Tile IDs (%d): \n" % len(l), l) + elif args.subparser_name == "summary": + + # Print out the number of tiles, products, and unique bursts + print("Number of tiles: ", len(all_tile_ids)) + print("Number of products: ", len(product_to_bursts.keys())) + print("Number of unique bursts: ", len(bursts_to_products.keys())) + + # Find the tile with the most products and then print out all the products and their bursts + tile_with_most_products = max(dist_products.items(), key=lambda x: len(x[1])) + print("Tile with most products: ", tile_with_most_products[0], "with", len(tile_with_most_products[1]), "products") + print("Tile ID, Number of Products, Product IDs, Bursts") + tile_id = tile_with_most_products[0] + for product_id in sorted(list(dist_products[tile_id])): + burst_ids = sorted(list(product_to_bursts[product_id])) + print(f"{product_id} ({len(burst_ids)} bursts): {burst_ids}") + elif args.subparser_name == "native_id": + burst_id, acquisition_dts = parse_r2_product_file_name(args.native_id, "L2_RTC_S1") + acquisition_index = determine_acquisition_cycle(burst_id, acquisition_dts, args.native_id) + products = bursts_to_products[burst_id] + + if len(products) == 0: + print("No DIST-S1 products are associated with burst id: ", burst_id) + exit(-1) + + print("Burst id: ", burst_id) + print("Acquisition datetime: ", acquisition_dts) + print("Acquisition index: ", acquisition_index) + print("Product IDs: ", products) + for product in products: + print("--product-id-time: ", f"{product},{acquisition_dts}") + if args.no_geometry is False: + print("Burst geometry minx, miny, maxx, maxy: ", get_burst_geometry(burst_grid, burst_id)) + elif args.subparser_name == "tile_id": + tile_id = args.tile_id + if tile_id not in dist_products.keys(): + print("Tile ID: ", tile_id, "does not exist") + exit(-1) + + # datetime looks like this: 20250614T015042Z + if args.first_product_datetime: + first_product_datetime = datetime.strptime(args.first_product_datetime, "%Y%m%dT%H%M%SZ") + else: + first_product_datetime = None + + print("Tile ID: ", tile_id) + print("Product IDs and burst ids: ") + product_ids = sorted(list(dist_products[tile_id])) + first_product_first_burst_id = sorted(list(product_to_bursts[product_ids[0]]))[0] + first_burst_identification_number = int(first_product_first_burst_id.split("-")[1]) + for product_id in product_ids: + burst_ids = sorted(list(product_to_bursts[product_id])) + if first_product_datetime: + current_burst_identification_number = int(burst_ids[0].split("-")[1]) + delta_seconds = 12 * 24 * 60 * 60 * (current_burst_identification_number - first_burst_identification_number) / 375887 + product_datetime = first_product_datetime + timedelta(seconds=delta_seconds) + product_datetime_str = product_datetime.strftime("%Y%m%dT%H%M%SZ") + print(f"{product_id} ({product_datetime_str}) ({len(burst_ids)} bursts): {burst_ids}") + else: + print(f"{product_id} ({len(burst_ids)} bursts): {burst_ids}") + elif args.subparser_name == "burst_id": + burst_id = args.burst_id + if burst_id not in bursts_to_products.keys(): + print("Burst id: ", burst_id, "is not associated with any products") + exit(-1) + + print("Burst id: ", burst_id) + product_ids = bursts_to_products[burst_id] + print("Product IDs: ({len(product_ids))", product_ids) + if args.no_geometry is False: + print("Burst geometry minx, miny, maxx, maxy: ", get_burst_geometry(burst_grid, burst_id)) + elif args.subparser_name == "trigger_granules": + print("Triggering granules") + + products_triggered, _, __, ___ = trigger_from_cmr_survey_csv(args.cmr_survey_csv, 0, datetime.now(), product_to_bursts, bursts_to_products, complete_bursts_only=args.complete_tiles_only) -elif args.subparser_name == "trigger_granules": - print("Triggering granules") + if args.tile_to_trigger: + products_triggered = {k: v for k, v in products_triggered.items() if k.startswith(args.tile_to_trigger)} - products_triggered, _, __, ___ = trigger_from_cmr_survey_csv(args.cmr_survey_csv, 0, datetime.now(), product_to_bursts, bursts_to_products, complete_bursts_only=args.complete_tiles_only) - - if args.tile_to_trigger: - products_triggered = {k: v for k, v in products_triggered.items() if k.startswith(args.tile_to_trigger)} + # Sort products_triggered by their latest acquisition time + products_triggered_sorted = sorted(products_triggered.items(), key=lambda x: x[1].latest_acquisition) + for product_id, product in products_triggered_sorted: + print(f"{product_id=} {product.latest_acquisition.strftime('%Y-%m-%d %H:%M:%S')} {product.used_bursts=} {product.possible_bursts=}") + if args.tile_to_trigger: + print(f"RTC granules: {product.rtc_granules}\n") - # Sort products_triggered by their latest acquisition time - products_triggered_sorted = sorted(products_triggered.items(), key=lambda x: x[1].latest_acquisition) - for product_id, product in products_triggered_sorted: - print(f"{product_id=} {product.latest_acquisition.strftime('%Y-%m-%d %H:%M:%S')} {product.used_bursts=} {product.possible_bursts=}") - if args.tile_to_trigger: - print(f"RTC granules: {product.rtc_granules}\n") - +if __name__ == '__main__': + main() diff --git a/util/ctx_util.py b/util/ctx_util.py index ceff838bc..adb66cfdd 100644 --- a/util/ctx_util.py +++ b/util/ctx_util.py @@ -83,3 +83,17 @@ def get(self, key): "Docker params '{}' doesn't exist in {}.".format(key, self._file) ) ) + + +def job_param_by_name(context: dict, name: str): + """ + Gets the job specification parameter from the _context.json file. + :param context: the dict representation of _context.json. + :param name: the name of the job specification parameter. + """ + + for param in context["job_specification"]["params"]: + if param["name"] == name: + return param["value"] + + raise Exception(f"param ({name}) not found in _context.json") diff --git a/util/datasets_json_util.py b/util/datasets_json_util.py index 2f7cb9927..02cc1baba 100644 --- a/util/datasets_json_util.py +++ b/util/datasets_json_util.py @@ -11,13 +11,24 @@ class DatasetsJson: def __init__(self, file: Optional[str] = None): """Constructor. Parses datasets.json - :param file: filepath to datasets.json. Defaults to "../conf/sds/files/datasets.json", relative to this module. + :param file: filepath to datasets.json. Defaults to checking Mozart, Verdi, + and then relative paths. """ - if file is None: - file = norm_path( - os.path.join(os.path.dirname(__file__), "..", "conf", "sds", "files", "datasets.json") - ) + # Intercept if file is None OR if it's the hardcoded string from dataset_util.py + if file is None or (file == "datasets.json" and not os.path.exists(file)): + mozart_path = os.path.expanduser("~/mozart/ops/opera-pcm/conf/sds/files/datasets.json") + verdi_path = os.path.expanduser("~/verdi/etc/datasets.json") + + if os.path.exists(mozart_path): + file = mozart_path + elif os.path.exists(verdi_path): + file = verdi_path + else: + # Fallback to the original relative path logic + file = norm_path( + os.path.join(os.path.dirname(__file__), "..", "conf", "sds", "files", "datasets.json") + ) # Open up the datasets.json file and create a dictionary of datasets keyed by dataset type with open(file) as f: @@ -28,6 +39,7 @@ def get(self, key): '''Returns the dataset with the given key. Key is the dataset type.''' return self._datasets_json[key] + # TODO: Refactor so that all the functions below are methods of DatasetsJson def find_publish_location_s3(datasets_json, dataset_type): diff --git a/util/grq_client.py b/util/grq_client.py index d1e256618..729765367 100644 --- a/util/grq_client.py +++ b/util/grq_client.py @@ -77,18 +77,22 @@ def get_body(match_all=True) -> dict: def get_range( datetime_fieldname="creation_timestamp", start_dt_iso="1970-01-01", - end_dt_iso="9999-12-31T23:59:59.999" + end_dt_iso="9999-12-31T23:59:59.999", + gt: Literal["gt", "gte"] = "gte", + lt: Literal["lt", "lte"] = "lt" ) -> dict: """ Returns a query range filter typically set in an Elasticsearch body's $.query.bool.must[] section. The default range is from 1970 to the year 10,000. The "from" datetime uses "gte" and the "to" datetime uses "lt". + + Example: {"range": {"creation_timestamp": {"gte": "1970-01-01T00:00:00"}, "lt": "9999-12-31T23:59:59.999"}} """ return { "range": { datetime_fieldname: { - "gte": start_dt_iso, - "lt": end_dt_iso + gt: start_dt_iso, + lt: end_dt_iso } } } diff --git a/wrapper/opera_pge_wrapper.py b/wrapper/opera_pge_wrapper.py index 9a58a99ed..f2ac83818 100644 --- a/wrapper/opera_pge_wrapper.py +++ b/wrapper/opera_pge_wrapper.py @@ -34,7 +34,7 @@ from product2dataset import product2dataset from util import pge_util from util.conf_util import AlgorithmParameters, RunConfig -from util.ctx_util import JobContext, DockerParams +from util.ctx_util import JobContext, DockerParams, job_param_by_name from util.exec_util import exec_wrapper, call_noerr to_json = partial(json.dumps, indent=2) @@ -222,20 +222,6 @@ def create_required_directories(work_dir: str, context: Dict) -> Tuple[str, str, return input_dir, output_dir, scratch_dir, runconfig_dir -def job_param_by_name(context: Dict, name: str): - """ - Gets the job specification parameter from the _context.json file. - :param context: the dict representation of _context.json. - :param name: the name of the job specification parameter. - """ - - for param in context["job_specification"]["params"]: - if param["name"] == name: - return param["value"] - - raise Exception(f"param ({name}) not found in _context.json") - - def exec_pge_command( context: Dict, work_dir: str,