Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions pipelines/jobs/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from io import BytesIO

import django
import numpy as np
Copy link

Copilot AI Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider importing numpy only when needed (e.g., from numpy import nan) since numpy is a heavy dependency and only used for np.nan replacement on line 144.

Copilot uses AI. Check for mistakes.
import pandas as pd
import requests
from dagster import OpExecutionContext, job, op
Expand Down Expand Up @@ -127,20 +128,46 @@ def load_metadata_for_model(context: OpExecutionContext, sheet_name: str, model:
id_fields = "wealth_characteristic_label"
else:
id_fields = "code"
# Add primary keys if they are not already in the id_fields,
# so that we can save individual instances if required
if isinstance(id_fields, str):
id_fields = [id_fields]
if model._meta.pk.name not in id_fields:
keys_df = pd.DataFrame.from_records(
model.objects.all().values(model._meta.pk.name, *id_fields)
) # NOQA: E501
df = df.merge(
keys_df,
how="left",
on=id_fields,
)
df[model._meta.pk.name] = df[model._meta.pk.name].replace(np.nan, None)
# Turn the dataframe into a set of unsaved model instances
instances = []
for record in df.to_dict(orient="records"):
if isinstance(id_fields, str):
id_fields = [id_fields]

record = {k: v for k, v in record.items() if k in valid_field_names}
instances.append(model(**record))
instances = model.objects.bulk_create(
instances,
update_conflicts=True,
update_fields=[k for k in record if k not in id_fields],
unique_fields=id_fields,
)
context.log.info(f"Created or updated {len(instances)} {sheet_name} instances")
try:
instances = model.objects.bulk_create(
instances,
update_conflicts=True,
update_fields=[k for k in record if k not in id_fields and k != model._meta.pk.name],
unique_fields=id_fields,
)
context.log.info(f"Created or updated {len(instances)} {sheet_name} instances")
except Exception as e:
# Bulk create failed, so try creating/updating the instances one at a time to see which one failed
for i, instance in enumerate(instances):
try:
instance.save()
except Exception as e2:
key = [getattr(instance, id_field) for id_field in id_fields]
instance = {
k: v for k, v in instance.__dict__.items() if k not in ["_state", "created", "modified"]
}
raise RuntimeError(
f"Failed to create/update {model_name} instance {i} {key} from:\n{json.dumps(instance, indent=4, ensure_ascii=False)}"
Copy link

Copilot AI Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable model_name is not defined in this scope. It should likely be model.__name__ or model._meta.model_name.

Copilot uses AI. Check for mistakes.
) from e2


@op
Expand Down
Loading