diff --git a/pipelines/jobs/metadata.py b/pipelines/jobs/metadata.py index 089428f..24fb3a0 100644 --- a/pipelines/jobs/metadata.py +++ b/pipelines/jobs/metadata.py @@ -8,6 +8,7 @@ from io import BytesIO import django +import numpy as np import pandas as pd import requests from dagster import OpExecutionContext, job, op @@ -127,20 +128,47 @@ def load_metadata_for_model(context: OpExecutionContext, sheet_name: str, model: id_fields = "wealth_characteristic_label" else: id_fields = "code" + # Add primary keys if they are not already in the id_fields, + # so that we can save individual instances if required + if isinstance(id_fields, str): + id_fields = [id_fields] + if model._meta.pk.name not in id_fields: + keys_df = pd.DataFrame.from_records( + model.objects.all().values(model._meta.pk.name, *id_fields) + ) # NOQA: E501 + df = df.merge( + keys_df, + how="left", + on=id_fields, + ) + df[model._meta.pk.name] = df[model._meta.pk.name].replace(np.nan, None) + # Turn the dataframe into a set of unsaved model instances instances = [] + fields = [k for k in df.columns if k in valid_field_names] for record in df.to_dict(orient="records"): - if isinstance(id_fields, str): - id_fields = [id_fields] - - record = {k: v for k, v in record.items() if k in valid_field_names} + record = {k: v for k, v in record.items() if k in fields} instances.append(model(**record)) - instances = model.objects.bulk_create( - instances, - update_conflicts=True, - update_fields=[k for k in record if k not in id_fields], - unique_fields=id_fields, - ) - context.log.info(f"Created or updated {len(instances)} {sheet_name} instances") + try: + instances = model.objects.bulk_create( + instances, + update_conflicts=True, + update_fields=[k for k in fields if k not in id_fields and k != model._meta.pk.name], + unique_fields=id_fields, + ) + context.log.info(f"Created or updated {len(instances)} {sheet_name} instances") + except Exception: + # Bulk create failed, so try creating/updating the instances one at a time to see which one failed + for i, instance in enumerate(instances): + try: + instance.save() + except Exception as e: + key = [getattr(instance, id_field) for id_field in id_fields] + instance = { + k: v for k, v in instance.__dict__.items() if k not in ["_state", "created", "modified"] + } + raise RuntimeError( + f"Failed to create/update {model_name} instance {i} {key} from:\n{json.dumps(instance, indent=4, ensure_ascii=False)}" + ) from e @op