From 16661c0407970e706e36b152a2c3ba31c65b3cf0 Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Tue, 12 Nov 2024 21:36:08 -0500 Subject: [PATCH 1/4] Refactor wealth_characteristic_instances for clarity - see HEA-572 --- pipelines/assets/wealth_characteristic.py | 38 ++++++++++++++--------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pipelines/assets/wealth_characteristic.py b/pipelines/assets/wealth_characteristic.py index e10f2458..12bea8c7 100644 --- a/pipelines/assets/wealth_characteristic.py +++ b/pipelines/assets/wealth_characteristic.py @@ -85,7 +85,6 @@ import django import pandas as pd from dagster import AssetExecutionContext, MetadataValue, Output, asset -from openpyxl.utils import get_column_letter from ..configs import BSSMetadataConfig from ..partitions import bss_instances_partitions_def @@ -197,7 +196,9 @@ def wealth_characteristic_instances( context.log.info("Loaded %d Wealth Characteristic Labels", len(label_map)) # Get a dataframe of the Wealth Groups for each column - wealth_group_df = get_wealth_group_dataframe(df, livelihood_zone_baseline, "WB", partition_key) + wealth_group_df = get_wealth_group_dataframe(df, livelihood_zone_baseline, "WB", partition_key).set_index( + "bss_column", drop=False + ) # Prepare the label column for matching against the label_map prepared_labels = prepare_lookup(df["A"]) @@ -257,21 +258,21 @@ def wealth_characteristic_instances( # Iterate over the value columns, from Column C to the the Summary Column. # We don't iterate over the last two columns because they contain the min_value and max_value that are # part of the Summary Wealth Characteristic Value rather than a separate Wealth Characteristic Value. - for i, value in enumerate(df.loc[row, "C" : df.columns[-3]]): - # Store the column to aid trouble-shooting. - # We need col_index + 1 to get the letter, and the enumerate is already starting from col C - column = get_column_letter(i + 3) + for column in df.columns[2:-2]: + value = df.loc[row, column] try: # Add find the reference_type: # Wealth Group (Form 4) values will have a full name and a wealth group category from Row 3 - if wealth_group_df.iloc[i]["full_name"] and wealth_group_df.iloc[i]["wealth_group_category"]: + if ( + wealth_group_df.loc[column, "full_name"] + and wealth_group_df.loc[column, "wealth_group_category"] + ): reference_type = WealthGroupCharacteristicValue.CharacteristicReference.WEALTH_GROUP # Community (Form 3) values will have a full name from Rows 4 and 5, but no wealth group category - elif wealth_group_df.iloc[i]["full_name"]: + elif wealth_group_df.loc[column, "full_name"]: reference_type = WealthGroupCharacteristicValue.CharacteristicReference.COMMUNITY # Summary values will not have full name or a wealth category, and will be in the last 3 columns - # Check for len(df.columns) -5 because the Summary col is 3rd from end, and i starts at Column C. - elif i == len(df.columns) - 5: + elif column == df.columns[-3]: reference_type = WealthGroupCharacteristicValue.CharacteristicReference.SUMMARY # There is no full name, and this isn't the summary, so we can ignore this column. This happens # because there are typically blank columns in BSS between each wealth group category. For example, @@ -290,8 +291,8 @@ def wealth_characteristic_instances( value != "" and reference_type and ( - not wealth_group_df.iloc[i]["wealth_group_category"] - or wealth_group_df.iloc[i]["wealth_group_category"] == wealth_group_category + not wealth_group_df.loc[column, "wealth_group_category"] + or wealth_group_df.loc[column, "wealth_group_category"] == wealth_group_category ) ): wealth_group_characteristic_value = attributes.copy() @@ -304,7 +305,11 @@ def wealth_characteristic_instances( wealth_group_category, # Note that we need to use the actual name from the instance, not the one calculated from # the BSS, which might have been matched using an alias. - wealth_group_df.iloc[i]["community"][2] if wealth_group_df.iloc[i]["community"] else "", + ( + wealth_group_df.loc[column, "community"][2] + if wealth_group_df.loc[column, "community"] + else "" + ), ) wealth_group_characteristic_value["reference_type"] = reference_type @@ -354,7 +359,12 @@ def wealth_characteristic_instances( [ wealth_group_df, wealth_group_df[wealth_group_df["community"] == wealth_group_df.iloc[0]["community"]][ - ["wealth_group_category_original", "wealth_group_category", "livelihood_zone_baseline", "community"] + [ + "wealth_group_category_original", + "wealth_group_category", + "livelihood_zone_baseline", + "community", + ] ].assign(community=None), ] ) From fab390e6aac422bbbd17b5f1b5d80fd4f4a38e26 Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Tue, 12 Nov 2024 21:37:48 -0500 Subject: [PATCH 2/4] Fix error in wealth_characteristic_instances - see HEA-572 Duplicate columns names in wealth_group_df were causing the pd.concat to add the Baseline Wealth Groups. --- pipelines/assets/baseline.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pipelines/assets/baseline.py b/pipelines/assets/baseline.py index c7e5c3bf..875a14d1 100644 --- a/pipelines/assets/baseline.py +++ b/pipelines/assets/baseline.py @@ -83,6 +83,9 @@ def get_wealth_group_dataframe( wealth_group_df = wealthgroupcategorylookup.do_lookup( wealth_group_df, "district", "wealth_group_category", update=True ) + # Remove the duplicate wealth_group_category_original column created by the second do_lookup(), + # which otherwise causes problems when trying to merge dataframes, e.g. when building the wealth_group_df. + wealth_group_df = wealth_group_df.loc[:, ~wealth_group_df.columns.duplicated()] except ValueError: pass From 1177234301eb4b6fd54c5ec7a9f034f4f6a4ccf7 Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Fri, 15 Nov 2024 14:53:21 -0500 Subject: [PATCH 3/4] Fix missing payment_per_time attribute - see HEA-572 The BSSs often use labels that match to `price` for `PaymentInKind` or `OtherCashIncome` activities, rather than explicit labels that match `payment_per_time`. Therefore, we correct misidentified labels for these Strategy Types. --- pipelines/assets/livelihood_activity.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pipelines/assets/livelihood_activity.py b/pipelines/assets/livelihood_activity.py index 2b1ab763..bdb1d55b 100644 --- a/pipelines/assets/livelihood_activity.py +++ b/pipelines/assets/livelihood_activity.py @@ -853,6 +853,15 @@ def get_instances_from_dataframe( ) activity_attribute = None + # For Payment In Kind and Other Cash Income the attribute for payment_per_time sometimes uses a label + # that normally matches the price attribute. + if activity_attribute == "price": + if livelihood_strategy["strategy_type"] in ( + LivelihoodStrategyType.PAYMENT_IN_KIND, + LivelihoodStrategyType.OTHER_CASH_INCOME, + ): + activity_attribute = "payment_per_time" + # Some BSS incorrectly specify the product in the value columns instead of in the label column # Therefore, if we have specified the product__name as the attribute, check that the product # can be identified and is the same for all columns and then add it to the Livelihood Strategy. From c65396098f7d1495fde068792286f17ab4e95d37 Mon Sep 17 00:00:00 2001 From: Roger Hunwicks Date: Fri, 15 Nov 2024 23:38:18 -0500 Subject: [PATCH 4/4] Refactor assets to relax data completeness constraint - see HEA-572 Refactor the pipeline so that we can generate and import a fixture for each worksheet (WB, Data, Data2, Data3) within a BSS, instead of needing to consolidate to a single fixture first. We retain the consolidated_fixture and imported_baseline assets and eventually we should disable the standalone imports. --- pipelines/__init__.py | 36 +- pipelines/assets/fixtures.py | 449 +++++++++++----------- pipelines/assets/livelihood_activity.py | 60 +++ pipelines/assets/other_cash_income.py | 63 ++- pipelines/assets/wealth_characteristic.py | 51 +++ pipelines/assets/wild_foods.py | 63 ++- pipelines/jobs/fixtures.py | 20 +- 7 files changed, 499 insertions(+), 243 deletions(-) diff --git a/pipelines/__init__.py b/pipelines/__init__.py index ed1a7f02..7c490044 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -10,40 +10,50 @@ ) from .assets.baseline import baseline_instances, community_instances from .assets.fixtures import ( - consolidated_fixtures, - consolidated_instances, - imported_baselines, + consolidated_fixture, + imported_baseline, imported_communities, uploaded_baselines, - validated_instances, ) from .assets.livelihood_activity import ( all_livelihood_activity_labels_dataframe, + imported_livelihood_activities, livelihood_activity_dataframe, + livelihood_activity_fixture, livelihood_activity_instances, livelihood_activity_label_dataframe, + livelihood_activity_valid_instances, summary_livelihood_activity_labels_dataframe, ) from .assets.other_cash_income import ( all_other_cash_income_labels_dataframe, + imported_other_cash_income_activities, other_cash_income_dataframe, + other_cash_income_fixture, other_cash_income_instances, other_cash_income_label_dataframe, + other_cash_income_valid_instances, summary_other_cash_income_labels_dataframe, ) from .assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, + imported_wealth_characteristics, summary_wealth_characteristic_labels_dataframe, wealth_characteristic_dataframe, + wealth_characteristic_fixture, wealth_characteristic_instances, wealth_characteristic_label_dataframe, + wealth_characteristic_valid_instances, ) from .assets.wild_foods import ( all_wild_foods_labels_dataframe, + imported_wild_foods_activities, summary_wild_foods_labels_dataframe, wild_foods_dataframe, + wild_foods_fixture, wild_foods_instances, wild_foods_label_dataframe, + wild_foods_valid_instances, ) from .jobs.fixtures import ( extract_dataframes, @@ -76,27 +86,37 @@ all_livelihood_activity_labels_dataframe, summary_livelihood_activity_labels_dataframe, livelihood_activity_instances, + livelihood_activity_valid_instances, + livelihood_activity_fixture, + imported_livelihood_activities, other_cash_income_dataframe, other_cash_income_label_dataframe, all_other_cash_income_labels_dataframe, summary_other_cash_income_labels_dataframe, other_cash_income_instances, + other_cash_income_valid_instances, + other_cash_income_fixture, + imported_other_cash_income_activities, wild_foods_dataframe, wild_foods_label_dataframe, all_wild_foods_labels_dataframe, summary_wild_foods_labels_dataframe, wild_foods_instances, + wild_foods_valid_instances, + wild_foods_fixture, + imported_wild_foods_activities, wealth_characteristic_dataframe, wealth_characteristic_label_dataframe, all_wealth_characteristic_labels_dataframe, wealth_characteristic_instances, + wealth_characteristic_valid_instances, + wealth_characteristic_fixture, + imported_wealth_characteristics, summary_wealth_characteristic_labels_dataframe, - consolidated_instances, - validated_instances, - consolidated_fixtures, + consolidated_fixture, uploaded_baselines, imported_communities, - imported_baselines, + imported_baseline, ], jobs=[ update_metadata, diff --git a/pipelines/assets/fixtures.py b/pipelines/assets/fixtures.py index 0f2c2851..59021322 100644 --- a/pipelines/assets/fixtures.py +++ b/pipelines/assets/fixtures.py @@ -27,7 +27,178 @@ from common.management.commands import verbose_load_data # NOQA: E402 -def get_fixture_from_instances(instance_dict: dict[str, list[dict]]) -> list[dict]: +def validate_instances( + context: AssetExecutionContext, instances: dict[str, list[dict]], partition_key: str +) -> tuple[dict[str, list[dict]], dict]: + """ + Validate the instances for a set of related models, prior to loading them as a fixture. + + Validating the instances before attempting the load allows us to provide + more informative error messages than the ones provided by Django's `loaddata`. + """ + + # For Livelihood Activities we need to create and validate instances of the subclass, as well as the base class + # so create an additional list of instances for each subclass. + if "LivelihoodActivity" in instances: + subclass_livelihood_activities = defaultdict(list) + for instance in instances["LivelihoodActivity"]: + subclass_instance = instance.copy() + # The subclass instances also need a pointer to the base class instance + subclass_instance["livelihoodactivity_ptr"] = ( + instance["wealth_group"][:3] + instance["livelihood_strategy"][2:] + [instance["wealth_group"][3]] + ) + subclass_livelihood_activities[instance["strategy_type"]].append(subclass_instance) + + instances = {**instances, **subclass_livelihood_activities} + + # Create a dict to contain a dataframe of the instances for each model + dfs = {} + errors = [] + for model_name, model_instances in instances.items(): + # Ignore models where we don't have any instances to validate. + if model_instances: + model = class_from_name(f"baseline.models.{model_name}") + # Build a list of expected field names + valid_field_names = [field.name for field in model._meta.concrete_fields] + # Also include values that point directly to the primary key of related objects + valid_field_names += [ + field.get_attname() + for field in model._meta.concrete_fields + if field.get_attname() not in valid_field_names + ] + + # Apply some model-level defaults. We do this by iterating over the instances rather than using the dataframe + # because we want to return the instances without any other changes that might be made by the dataframe as a + # result of dtype conversion or NaNs. + current_timestamp = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + for instance in model_instances: + for field in ["created", "modified"]: + if field in valid_field_names and field not in instance: + instance[field] = current_timestamp + + df = pd.DataFrame.from_records(model_instances) + + # Add the natural key for the instances to the dataframe, + # so we can validate foreign keys in child models. + if model_name == "LivelihoodZone": + df["key"] = df["code"] + elif model_name == "LivelihoodZoneBaseline": + df["key"] = df[["livelihood_zone_id", "reference_year_end_date"]].apply( + lambda x: [x.iloc[0], x.iloc[1]], axis="columns" + ) + elif model_name == "Community": + df["key"] = df[["livelihood_zone_baseline", "full_name"]].apply( + lambda x: x.iloc[0] + [x.iloc[1]], axis="columns" + ) + elif model_name == "WealthGroup": + df["key"] = df[["livelihood_zone_baseline", "wealth_group_category", "community"]].apply( + lambda x: x.iloc[0] + [x.iloc[1], x.iloc[2][-1] if x.iloc[2] else ""], axis="columns" + ) + elif model_name == "LivelihoodStrategy": + df["key"] = df[ + ["livelihood_zone_baseline", "strategy_type", "season", "product_id", "additional_identifier"] + ].apply( + lambda x: x.iloc[0] + + [x.iloc[1], x.iloc[2][0] if x.iloc[2] else "", x.iloc[3] if x.iloc[3] else "", x.iloc[4]], + axis="columns", + ) + elif model_name == "LivelihoodActivity": + df["key"] = df[["livelihood_zone_baseline", "wealth_group", "livelihood_strategy"]].apply( + lambda x: [ + x.iloc[0][0], + x.iloc[0][1], + x.iloc[1][2], + x.iloc[2][2], + x.iloc[2][3], + x.iloc[2][4], + x.iloc[2][5], + x.iloc[1][3], + ], + axis="columns", + ) + + # Save the model and dataframe so we can use them validate natural foreign keys later + dfs[model_name] = (model, df) + + # Apply field-level checks + for field in model._meta.concrete_fields: + column = field.name if field.name in df else field.get_attname() + # Ensure that mandatory fields have values + if not field.blank: + if column not in df: + error = f"Missing mandatory field {field.name} for {model_name}" + errors.append(error) + continue + else: + for record in df[df[column].isnull()].itertuples(): + error = ( + f"Missing value for mandatory field {column} for {model_name} in record " + f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row}" + f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}' + ) + errors.append(error) + # Ensure the instances contain valid parent references for foreign keys + if isinstance(field, models.ForeignKey): + if column not in df: + error = f"Missing mandatory foreign key {column} for {model_name}" + errors.append(error) + continue + if not field.null: + for record in df[df[column].isnull()].itertuples(): + error = ( + f"Missing mandatory foreign key {column} for {model_name} in record " + f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row}" + f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}' + ) + errors.append(error) + # Validate foreign key values + if field.related_model.__name__ in dfs: + # The model is part of the fixture, so use the saved key from the dataframe + remote_keys = dfs[field.related_model.__name__][1]["key"] + else: + # The model is not in the fixture, so use the primary and natural keys for already saved instances + remote_keys = [instance.pk for instance in field.related_model.objects.all()] + if "natural_key" in dir(field.related_model): + remote_keys += [ + list(instance.natural_key()) for instance in field.related_model.objects.all() + ] + # Check the non-null foreign key values are in the remote keys + for record in df[ + df[column].replace("", pd.NA).notna() & ~df[column].isin(remote_keys) + ].itertuples(): + error = ( + f"Unrecognized '{column}' foreign key {getattr(record, column)} " + f"for {model_name} in record " + f"{record.Index} from cell '{record.bss_sheet}'!{record.bss_column}{record.bss_row}" + f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}' + ) + errors.append(error) + + # Check that the kcals/kg matches the values in the ClassifiedProduct model, if it's present in the BSS + if model_name == "LivelihoodActivity" and "product__kcals_per_unit" in df: + df["product"] = df["livelihood_strategy"].apply(lambda x: x[4]) + df = ClassifiedProductLookup().get_instances(df, "product", "product") + df["reference_kcals_per_unit"] = df["product"].apply(lambda x: x.kcals_per_unit) + df["reference_unit_of_measure"] = df["product"].apply(lambda x: x.unit_of_measure) + for record in df[df["product__kcals_per_unit"] != df["reference_kcals_per_unit"]].itertuples(): + error = ( + f"Non-standard value {record.product__kcals_per_unit} in '{record.column}" + f"for {model_name} in record " + f'{str({k: v for k,v in record._asdict().items() if k != "Index"})}. ' + f"Expected {record.reference_kcals_per_unit}/{record.reference_unit_of_measure} for {record.product}" + ) + errors.append(error) + + if errors: + raise RuntimeError("Missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, "\n".join(errors))) + + metadata = {f"num_{key.lower()}": len(value) for key, value in instances.items()} + metadata["total_instances"] = sum(len(value) for value in instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(instances, indent=4)}\n```") + return instances, metadata + + +def get_fixture_from_instances(instance_dict: dict[str, list[dict]]) -> tuple[list[dict], dict]: """ Convert a dict containing a list of instances for each model into a Django fixture. """ @@ -90,202 +261,65 @@ def get_fixture_from_instances(instance_dict: dict[str, list[dict]]) -> list[dic metadata[f'num_{str(model._meta).split(".")[-1]}'] += 1 metadata["total_instances"] = len(fixture) - return metadata, fixture - - -@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") -def consolidated_instances( - wealth_characteristic_instances, - livelihood_activity_instances, - other_cash_income_instances, - wild_foods_instances, -) -> Output[dict]: - """ - Consolidated record instances from a BSS, ready to be validated. - """ - # Build a dict of all the models, and their instances, that are to be loaded - consolidated_instances = { - # Put the wealth_characteristic_fixture first, because it loads the - # WealthGroup instances, which are needed as a foreign key from LivelihoodActivity, etc. - **wealth_characteristic_instances, - **livelihood_activity_instances, - } - # Add the wild foods and other cash income instances, if they are present - for model_name, instances in {**other_cash_income_instances, **wild_foods_instances}.items(): - if instances: - consolidated_instances[model_name] += instances - - # For Livelihood Activities we need to create instances of the subclass, as well as the base class - # so create an additional list of instances for each subclass. - subclass_livelihood_activities = defaultdict(list) - for instance in consolidated_instances["LivelihoodActivity"]: - # The subclass instances also need a pointer to the base class instance - instance["livelihoodactivity_ptr"] = ( - instance["wealth_group"][:3] + instance["livelihood_strategy"][2:] + [instance["wealth_group"][3]] - ) - subclass_livelihood_activities[instance["strategy_type"]].append(instance) - - consolidated_instances = {**consolidated_instances, **subclass_livelihood_activities} - - metadata = {f"num_{key.lower()}": len(value) for key, value in consolidated_instances.items()} - metadata["total_instances"] = sum(len(value) for value in consolidated_instances.values()) - metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(consolidated_instances, indent=4)}\n```") - return Output( - consolidated_instances, - metadata=metadata, - ) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4)}\n```") + return fixture, metadata -@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") -def validated_instances( - context: AssetExecutionContext, - consolidated_instances, -) -> Output[dict]: +def import_fixture(fixture: list[dict]) -> dict: """ - Validated record instances from a BSS, ready to be loaded via a Django fixture. + Import a Django fixture and return a metadata dictionary. """ - partition_key = context.asset_partition_key_for_output() - # Create a dict of all the models, and a dataframe of their instances - errors = [] - dfs = {} - for model_name, instances in consolidated_instances.items(): - model = class_from_name(f"baseline.models.{model_name}") - valid_field_names = [field.name for field in model._meta.concrete_fields] - # Also include values that point directly to the primary key of related objects - valid_field_names += [ - field.get_attname() - for field in model._meta.concrete_fields - if field.get_attname() not in valid_field_names - ] - df = pd.DataFrame.from_records(instances) - - # Add the key to the instances for this model, so we can validate foreign keys within the fixture - if model_name == "LivelihoodZone": - df["key"] = df["code"] - elif model_name == "LivelihoodZoneBaseline": - df["key"] = df[["livelihood_zone_id", "reference_year_end_date"]].apply( - lambda x: [x.iloc[0], x.iloc[1]], axis="columns" - ) - elif model_name == "Community": - df["key"] = df[["livelihood_zone_baseline", "full_name"]].apply( - lambda x: x.iloc[0] + [x.iloc[1]], axis="columns" - ) - elif model_name == "WealthGroup": - df["key"] = df[["livelihood_zone_baseline", "wealth_group_category", "community"]].apply( - lambda x: x.iloc[0] + [x.iloc[1], x.iloc[2][-1] if x.iloc[2] else ""], axis="columns" - ) - elif model_name == "LivelihoodStrategy": - df["key"] = df[ - ["livelihood_zone_baseline", "strategy_type", "season", "product_id", "additional_identifier"] - ].apply( - lambda x: x.iloc[0] - + [x.iloc[1], x.iloc[2][0] if x.iloc[2] else "", x.iloc[3] if x.iloc[3] else "", x.iloc[4]], - axis="columns", - ) - elif model_name == "LivelihoodActivity": - df["key"] = df[["livelihood_zone_baseline", "wealth_group", "livelihood_strategy"]].apply( - lambda x: [ - x.iloc[0][0], - x.iloc[0][1], - x.iloc[1][2], - x.iloc[2][2], - x.iloc[2][3], - x.iloc[2][4], - x.iloc[2][5], - x.iloc[1][3], - ], - axis="columns", - ) - - # Apply some model-level defaults - if "created" in valid_field_names and "created" not in df: - df["created"] = pd.Timestamp.now(datetime.timezone.utc) - if "modified" in valid_field_names and "modified" not in df: - df["modified"] = pd.Timestamp.now(datetime.timezone.utc) - - # Save the model and dataframe so we can use them validate natural foreign keys later - dfs[model_name] = (model, df) - - # Apply field-level checks - for field in model._meta.concrete_fields: - column = field.name if field.name in df else field.get_attname() - # Ensure that mandatory fields have values - if not field.blank: - if column not in df: - error = f"Missing mandatory field {field.name} for {model_name}" - errors.append(error) - continue - else: - for row in df[df[column].isnull()].itertuples(): - error = f"Missing value for mandatory field {column} for {model_name} in row {row.Index} {row._asdict()}" # NOQA: E501 - errors.append(error) - # Ensure the consolidated instances contain valid parent references for foreign keys - if isinstance(field, models.ForeignKey): - if column not in df: - error = f"Missing mandatory foreign key {column} for {model_name}" - errors.append(error) - continue - if not field.null: - for row in df[df[column].isnull()].itertuples(): - error = f"Missing mandatory foreign key {column} for {model_name} in row {row.Index} {row._asdict()}" # NOQA: E501 - errors.append(error) - # Validate foreign key values - if field.related_model.__name__ in dfs: - # The model is part of the fixture, so use the saved key from the dataframe - remote_keys = dfs[field.related_model.__name__][1]["key"] - else: - # The model is not in the fixture, so use the primary and natural keys for already saved instances - remote_keys = [instance.pk for instance in field.related_model.objects.all()] - if "natural_key" in dir(field.related_model): - remote_keys += [list(instance.natural_key()) for instance in field.related_model.objects.all()] - # Check the non-null foreign key values are in the remote keys - for row in df[df[column].notna() & ~df[column].isin(remote_keys)].itertuples(): - error = ( - f"Unrecognized foreign key {getattr(row, column)} " - f"in column '{column}' for {model_name} instance " - f'{str({k: v for k,v in row._asdict().items() if k != "Index"})}' - ) - errors.append(error) - - # Check that the kcals/kg matches the values in the ClassifiedProduct model, if it's present in the BSS - if model_name == "LivelihoodActivity" and "product__kcals_per_unit" in df: - df["product"] = df["livelihood_strategy"].apply(lambda x: x[4]) - df = ClassifiedProductLookup().get_instances(df, "product", "product") - df["reference_kcals_per_unit"] = df["product"].apply(lambda x: x.kcals_per_unit) - df["reference_unit_of_measure"] = df["product"].apply(lambda x: x.unit_of_measure) - for row in df[df["product__kcals_per_unit"] != df["reference_kcals_per_unit"]].itertuples(): - error = ( - f"Non-standard value {row.product__kcals_per_unit} " - f"in '{row.column}{row.row}' for {model_name} instance " - f'{str({k: v for k,v in row._asdict().items() if k != "Index"})}. ' - f"Expected {row.reference_kcals_per_unit}/{row.reference_unit_of_measure} for {row.product}" - ) - errors.append(error) + output_buffer = StringIO() - if errors: - errors = "\n".join(errors) - raise RuntimeError("Missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, errors)) + # We need to use a .verbose_json file extension for Django to use the correct serializer. + with tempfile.NamedTemporaryFile(mode="w+", suffix=".verbose_json") as f: + # Write the fixture to a temporary file so that Django can access it + f.write(json.dumps(fixture)) + f.seek(0) + call_command(verbose_load_data.Command(), f.name, verbosity=2, format="verbose_json", stdout=output_buffer) - metadata = {f"num_{key.lower()}": len(value) for key, value in consolidated_instances.items()} - metadata["total_instances"] = sum(len(value) for value in consolidated_instances.values()) - metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(consolidated_instances, indent=4)}\n```") - return Output( - consolidated_instances, - metadata=metadata, - ) + # Create the metadata reporting the number of instances created for each model + metadata = defaultdict(int) + for instance in fixture: + metadata[f'num_{instance["model"].split(".")[-1]}'] += 1 + metadata["total_instances"] = len(fixture) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4)}\n```") + metadata["output"] = MetadataValue.md(f"```\n{output_buffer.getvalue()}\n```") + return metadata @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") -def consolidated_fixtures( +def consolidated_fixture( context: AssetExecutionContext, config: BSSMetadataConfig, - validated_instances, + wealth_characteristic_valid_instances, + livelihood_activity_valid_instances, + other_cash_income_valid_instances, + wild_foods_valid_instances, ) -> Output[list[dict]]: """ Consolidated Django fixture for a BSS, including Livelihood Activities and Wealth Group Characteristic Values. """ - metadata, fixture = get_fixture_from_instances(validated_instances) - metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4)}\n```") + # Combine the Wealth Charactertistic fixture with the records from the + # Livelihood Activities fixtures, ignoring the duplicate Wealth Group + # records. + consolidated_instances = { + # Put the wealth_characteristic_fixture first, because it loads the + # WealthGroup instances, which are needed as a foreign key from LivelihoodActivity, etc. + **wealth_characteristic_valid_instances, + **{ + model_name: instances + for model_name, instances in livelihood_activity_valid_instances.items() + if model_name != "WealthGroup" + }, + } + # Add the wild foods and other cash income instances, if they are present + for model_name, instances in {**other_cash_income_valid_instances, **wild_foods_valid_instances}.items(): + if instances and model_name != "WealthGroup": + consolidated_instances[model_name] += instances + + fixture, metadata = get_fixture_from_instances(consolidated_instances) + return Output( fixture, metadata=metadata, @@ -305,18 +339,8 @@ def uploaded_baselines( Downstream assets apply corrections to the original file and then process the contents to create Communities, Wealth Groups, Livelihood Strategies, etc. """ - metadata, fixture = get_fixture_from_instances(baseline_instances) - output_buffer = StringIO() - - # We need to use a .verbose_json file extension for Django to use the correct serializer. - with tempfile.NamedTemporaryFile(mode="w+", suffix=".verbose_json") as f: - # Write the fixture to a temporary file so that Django can access it - f.write(json.dumps(fixture)) - f.seek(0) - call_command(verbose_load_data.Command(), f.name, verbosity=2, format="verbose_json", stdout=output_buffer) - - # Add the output to the metadata - metadata["output"] = MetadataValue.md(f"```\n{output_buffer.getvalue()}\n```") + fixture, metadata = get_fixture_from_instances(baseline_instances) + metadata = import_fixture(fixture) # Add the file objects `bss` and `profile_report` FileFields to the model instances instance = baseline_instances["LivelihoodZoneBaseline"][0] @@ -340,19 +364,8 @@ def imported_communities( """ Communities from a BSS, imported to the Django database using a fixture. """ - metadata, fixture = get_fixture_from_instances(community_instances) - output_buffer = StringIO() - - # We need to use a .verbose_json file extension for Django to use the correct serializer. - with tempfile.NamedTemporaryFile(mode="w+", suffix=".verbose_json") as f: - # Write the fixture to a temporary file so that Django can access it - f.write(json.dumps(fixture)) - f.seek(0) - call_command(verbose_load_data.Command(), f.name, verbosity=2, format="verbose_json", stdout=output_buffer) - - # Add the output to the metadata - metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4)}\n```") - metadata["output"] = MetadataValue.md(f"```\n{output_buffer.getvalue()}\n```") + fixture, metadata = get_fixture_from_instances(community_instances) + metadata = import_fixture(fixture) return Output( None, @@ -361,28 +374,14 @@ def imported_communities( @asset(partitions_def=bss_instances_partitions_def) -def imported_baselines( +def imported_baseline( context: AssetExecutionContext, - consolidated_fixtures, + consolidated_fixture, ) -> Output[None]: """ - Imported Django fixtures for a BSS, added to the Django database. + Imported Django fixture for a BSS, added to the Django database. """ - output_buffer = StringIO() - - # We need to use a .verbose_json file extension for Django to use the correct serializer. - with tempfile.NamedTemporaryFile(mode="w+", suffix=".verbose_json") as f: - # Write the fixture to a temporary file so that Django can access it - f.write(json.dumps(consolidated_fixtures)) - f.seek(0) - call_command(verbose_load_data.Command(), f.name, verbosity=2, format="verbose_json", stdout=output_buffer) - - # Create the metadata reporting the number of instances created for each model - metadata = defaultdict(int) - for instance in consolidated_fixtures: - metadata[f'num_{instance["model"].split(".")[-1]}'] += 1 - metadata["total_instances"] = len(consolidated_fixtures) - metadata["output"] = MetadataValue.md(f"```\n{output_buffer.getvalue()}\n```") + metadata = import_fixture(consolidated_fixture) return Output( None, diff --git a/pipelines/assets/livelihood_activity.py b/pipelines/assets/livelihood_activity.py index bdb1d55b..d2c41683 100644 --- a/pipelines/assets/livelihood_activity.py +++ b/pipelines/assets/livelihood_activity.py @@ -69,6 +69,7 @@ get_summary_bss_label_dataframe, ) from .baseline import get_wealth_group_dataframe +from .fixtures import get_fixture_from_instances, import_fixture, validate_instances # set the default Django settings module os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings.production") @@ -1078,3 +1079,62 @@ def livelihood_activity_instances( partition_key, ) return output + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def livelihood_activity_valid_instances( + context: AssetExecutionContext, + livelihood_activity_instances, + wealth_characteristic_instances, +) -> Output[dict]: + """ + Valid LivelhoodStrategy and LivelihoodActivity instances from a BSS, ready to be loaded via a Django fixture. + """ + partition_key = context.asset_partition_key_for_output() + # Livelihood Activities depend on the Wealth Groups, so copy them from the wealth_characteristic_instances, making + # sure that the WealthGroup is the first entry in the dict, so that the lookup keys have been created before + # validate_instances processes the child model and needs them. + if any(instances for instances in livelihood_activity_instances.values()): + livelihood_activity_instances = { + **{"WealthGroup": wealth_characteristic_instances["WealthGroup"]}, + **livelihood_activity_instances, + } + valid_instances, metadata = validate_instances(context, livelihood_activity_instances, partition_key) + metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} + metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(valid_instances, indent=4)}\n```") + return Output( + valid_instances, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def livelihood_activity_fixture( + context: AssetExecutionContext, + config: BSSMetadataConfig, + livelihood_activity_valid_instances, +) -> Output[list[dict]]: + """ + Django fixture for the Livelihood Activities from a BSS. + """ + fixture, metadata = get_fixture_from_instances(livelihood_activity_valid_instances) + return Output( + fixture, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def) +def imported_livelihood_activities( + context: AssetExecutionContext, + livelihood_activity_fixture, +) -> Output[None]: + """ + Imported Django fixtures for a BSS, added to the Django database. + """ + metadata = import_fixture(livelihood_activity_fixture) + return Output( + None, + metadata=metadata, + ) diff --git a/pipelines/assets/other_cash_income.py b/pipelines/assets/other_cash_income.py index 13f9bc02..b6d0c482 100644 --- a/pipelines/assets/other_cash_income.py +++ b/pipelines/assets/other_cash_income.py @@ -38,11 +38,12 @@ | 32 | income | | | | | | | | | | """ # NOQA: E501 +import json import os import django import pandas as pd -from dagster import AssetExecutionContext, Output, asset +from dagster import AssetExecutionContext, MetadataValue, Output, asset from ..configs import BSSMetadataConfig from ..partitions import bss_instances_partitions_def @@ -52,6 +53,7 @@ get_bss_label_dataframe, get_summary_bss_label_dataframe, ) +from .fixtures import get_fixture_from_instances, import_fixture, validate_instances from .livelihood_activity import get_instances_from_dataframe # set the default Django settings module @@ -146,3 +148,62 @@ def other_cash_income_instances( partition_key, ) return output + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def other_cash_income_valid_instances( + context: AssetExecutionContext, + other_cash_income_instances, + wealth_characteristic_instances, +) -> Output[dict]: + """ + Valid LivelhoodStrategy and LivelihoodActivity instances from a BSS, ready to be loaded via a Django fixture. + """ + partition_key = context.asset_partition_key_for_output() + # Livelihood Activities depend on the Wealth Groups, so copy them from the wealth_characteristic_instances, making + # sure that the WealthGroup is the first entry in the dict, so that the lookup keys have been created before + # validate_instances processes the child model and needs them. + if any(instances for instances in other_cash_income_instances.values()): + other_cash_income_instances = { + **{"WealthGroup": wealth_characteristic_instances["WealthGroup"]}, + **other_cash_income_instances, + } + valid_instances, metadata = validate_instances(context, other_cash_income_instances, partition_key) + metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} + metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(valid_instances, indent=4)}\n```") + return Output( + valid_instances, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def other_cash_income_fixture( + context: AssetExecutionContext, + config: BSSMetadataConfig, + other_cash_income_valid_instances, +) -> Output[list[dict]]: + """ + Django fixture for the Livelihood Activities from a BSS. + """ + fixture, metadata = get_fixture_from_instances(other_cash_income_valid_instances) + return Output( + fixture, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def) +def imported_other_cash_income_activities( + context: AssetExecutionContext, + other_cash_income_fixture, +) -> Output[None]: + """ + Imported Django fixtures for a BSS, added to the Django database. + """ + metadata = import_fixture(other_cash_income_fixture) + return Output( + None, + metadata=metadata, + ) diff --git a/pipelines/assets/wealth_characteristic.py b/pipelines/assets/wealth_characteristic.py index 12bea8c7..1cc9b337 100644 --- a/pipelines/assets/wealth_characteristic.py +++ b/pipelines/assets/wealth_characteristic.py @@ -96,6 +96,7 @@ get_summary_bss_label_dataframe, ) from .baseline import get_wealth_group_dataframe +from .fixtures import get_fixture_from_instances, import_fixture, validate_instances # set the default Django settings module os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings.production") @@ -432,3 +433,53 @@ def wealth_characteristic_instances( result, metadata=metadata, ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def wealth_characteristic_valid_instances( + context: AssetExecutionContext, + wealth_characteristic_instances, +) -> Output[dict]: + """ + Valid WealthGroup and WealthGroupCharacteristicValue instances from a BSS, ready to be loaded via a Django fixture. + """ + partition_key = context.asset_partition_key_for_output() + valid_instances, metadata = validate_instances(context, wealth_characteristic_instances, partition_key) + metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} + metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(valid_instances, indent=4)}\n```") + return Output( + valid_instances, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def wealth_characteristic_fixture( + context: AssetExecutionContext, + config: BSSMetadataConfig, + wealth_characteristic_valid_instances, +) -> Output[list[dict]]: + """ + Django fixture for the Livelihood Activities from a BSS. + """ + fixture, metadata = get_fixture_from_instances(wealth_characteristic_valid_instances) + return Output( + fixture, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def) +def imported_wealth_characteristics( + context: AssetExecutionContext, + wealth_characteristic_fixture, +) -> Output[None]: + """ + Imported Django fixtures for a BSS, added to the Django database. + """ + metadata = import_fixture(wealth_characteristic_fixture) + return Output( + None, + metadata=metadata, + ) diff --git a/pipelines/assets/wild_foods.py b/pipelines/assets/wild_foods.py index 5c045e48..c1f3a71f 100644 --- a/pipelines/assets/wild_foods.py +++ b/pipelines/assets/wild_foods.py @@ -56,11 +56,12 @@ | 85 | TOTAL FISHING KCALS (%) | 0.009088932377 | 0.005577299413 | 0 | 0.009639776763 | 0.01133165595 | 0 | 0 | 0.009708632311 | 0 | """ # NOQA: E501 +import json import os import django import pandas as pd -from dagster import AssetExecutionContext, Output, asset +from dagster import AssetExecutionContext, MetadataValue, Output, asset from ..configs import BSSMetadataConfig from ..partitions import bss_instances_partitions_def @@ -70,6 +71,7 @@ get_bss_label_dataframe, get_summary_bss_label_dataframe, ) +from .fixtures import get_fixture_from_instances, import_fixture, validate_instances from .livelihood_activity import get_instances_from_dataframe # set the default Django settings module @@ -156,3 +158,62 @@ def wild_foods_instances( partition_key, ) return output + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def wild_foods_valid_instances( + context: AssetExecutionContext, + wild_foods_instances, + wealth_characteristic_instances, +) -> Output[dict]: + """ + Valid LivelhoodStrategy and LivelihoodActivity instances from a BSS, ready to be loaded via a Django fixture. + """ + partition_key = context.asset_partition_key_for_output() + # Livelihood Activities depend on the Wealth Groups, so copy them from the wealth_characteristic_instances, making + # sure that the WealthGroup is the first entry in the dict, so that the lookup keys have been created before + # validate_instances processes the child model and needs them. + if any(instances for instances in wild_foods_instances.values()): + wild_foods_instances = { + **{"WealthGroup": wealth_characteristic_instances["WealthGroup"]}, + **wild_foods_instances, + } + valid_instances, metadata = validate_instances(context, wild_foods_instances, partition_key) + metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()} + metadata["total_instances"] = sum(len(value) for value in valid_instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(valid_instances, indent=4)}\n```") + return Output( + valid_instances, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def wild_foods_fixture( + context: AssetExecutionContext, + config: BSSMetadataConfig, + wild_foods_valid_instances, +) -> Output[list[dict]]: + """ + Django fixture for the Livelihood Activities from a BSS. + """ + fixture, metadata = get_fixture_from_instances(wild_foods_valid_instances) + return Output( + fixture, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def) +def imported_wild_foods_activities( + context: AssetExecutionContext, + wild_foods_fixture, +) -> Output[None]: + """ + Imported Django fixtures for a BSS, added to the Django database. + """ + metadata = import_fixture(wild_foods_fixture) + return Output( + None, + metadata=metadata, + ) diff --git a/pipelines/jobs/fixtures.py b/pipelines/jobs/fixtures.py index 4dbac2fb..d4e1c3a2 100644 --- a/pipelines/jobs/fixtures.py +++ b/pipelines/jobs/fixtures.py @@ -7,17 +7,16 @@ from ..assets.base import bss_metadata, completed_bss_metadata from ..assets.baseline import baseline_instances from ..assets.fixtures import ( - consolidated_fixtures, - consolidated_instances, - imported_baselines, + consolidated_fixture, + imported_baseline, uploaded_baselines, - validated_instances, ) from ..assets.livelihood_activity import ( all_livelihood_activity_labels_dataframe, livelihood_activity_dataframe, livelihood_activity_instances, livelihood_activity_label_dataframe, + livelihood_activity_valid_instances, summary_livelihood_activity_labels_dataframe, ) from ..assets.other_cash_income import ( @@ -25,6 +24,7 @@ other_cash_income_dataframe, other_cash_income_instances, other_cash_income_label_dataframe, + other_cash_income_valid_instances, summary_other_cash_income_labels_dataframe, ) from ..assets.wealth_characteristic import ( @@ -33,6 +33,7 @@ wealth_characteristic_dataframe, wealth_characteristic_instances, wealth_characteristic_label_dataframe, + wealth_characteristic_valid_instances, ) from ..assets.wild_foods import ( all_wild_foods_labels_dataframe, @@ -40,6 +41,7 @@ wild_foods_dataframe, wild_foods_instances, wild_foods_label_dataframe, + wild_foods_valid_instances, ) from ..partitions import bss_files_partitions_def, bss_instances_partitions_def @@ -50,10 +52,12 @@ livelihood_activity_instances, other_cash_income_instances, wild_foods_instances, - consolidated_instances, - validated_instances, - consolidated_fixtures, - imported_baselines, + wealth_characteristic_valid_instances, + livelihood_activity_valid_instances, + other_cash_income_valid_instances, + wild_foods_valid_instances, + consolidated_fixture, + imported_baseline, ), partitions_def=bss_instances_partitions_def, )