Skip to content

Commit bcd480c

Browse files
committed
Merge branch 'main' into HEA-752/Dagster-GraphQL-API-is-intermittently-failing-with-a-ProtocolError-when-accessed-via-the-revproxy-Django-view
2 parents e2c246c + 3d606bf commit bcd480c

File tree

11 files changed

+414
-279
lines changed

11 files changed

+414
-279
lines changed

env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ PIP_INDEX_URL=https://pypi.python.org/simple/
4646
# Ingestion Parameters
4747
BSS_METADATA_WORKBOOK='gdrive://Database Design/BSS Metadata' # 15XVXFjbom1sScVXbsetnbgAnPpRux2AgNy8w5U8bXdI
4848
BSS_METADATA_STORAGE_OPTIONS='{"token": "service_account", "access": "read_only", "creds": ${GOOGLE_APPLICATION_CREDENTIALS}, "root_file_id": "0AOJ0gJ8sjnO7Uk9PVA"}'
49+
BSS_LABEL_RECOGNITION_WORKBOOK=./BSS_Labels.xlsx # or 'gdrive://Database Design/BSS Labels (${ENV}).xlsx'
50+
BSS_LABEL_RECOGNITION_STORAGE_OPTIONS='{}' # or '{"token": "service_account", "access": "full_control", "creds": ${GOOGLE_APPLICATION_CREDENTIALS}, "root_file_id": "0AOJ0gJ8sjnO7Uk9PVA"}'
4951
BSS_FILES_FOLDER='gdrive://Discovery Folder/Baseline Storage Sheets (BSS)'
5052
BSS_FILES_STORAGE_OPTIONS='{"token": "service_account", "access": "read_only", "creds": ${GOOGLE_APPLICATION_CREDENTIALS}, "root_file_id": "0AOJ0gJ8sjnO7Uk9PVA"}'
5153

pipelines/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
livelihood_activity_fixture,
2424
livelihood_activity_instances,
2525
livelihood_activity_label_dataframe,
26+
livelihood_activity_label_recognition_dataframe,
2627
livelihood_activity_valid_instances,
2728
livelihood_summary_dataframe,
2829
livelihood_summary_label_dataframe,
@@ -93,6 +94,7 @@
9394
livelihood_summary_label_dataframe,
9495
all_livelihood_summary_labels_dataframe,
9596
summary_livelihood_summary_labels_dataframe,
97+
livelihood_activity_label_recognition_dataframe,
9698
livelihood_activity_instances,
9799
livelihood_activity_valid_instances,
98100
livelihood_activity_fixture,

pipelines/assets/fixtures.py

Lines changed: 169 additions & 143 deletions
Large diffs are not rendered by default.

pipelines/assets/livelihood_activity.py

Lines changed: 166 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import django
5959
import pandas as pd
6060
from dagster import AssetExecutionContext, MetadataValue, Output, asset
61+
from django.db.models.functions import Lower
62+
from upath import UPath
6163

6264
from ..configs import BSSMetadataConfig
6365
from ..partitions import bss_instances_partitions_def
@@ -255,6 +257,49 @@ def get_livelihood_activity_regexes() -> list:
255257
return compiled_regexes
256258

257259

260+
@functools.cache
261+
def get_livelihood_activity_regular_expression_attributes(label: str) -> dict:
262+
"""
263+
Return a dict of the attributes for a well-known Livelihood Activity label using regular expression matches.
264+
"""
265+
label = prepare_lookup(label)
266+
attributes = {
267+
"activity_label": None,
268+
"strategy_type": None,
269+
"is_start": None,
270+
"product_id": None,
271+
"unit_of_measure_id": None,
272+
"season": None,
273+
"additional_identifier": None,
274+
"attribute": None,
275+
"notes": None,
276+
}
277+
for pattern, strategy_type, is_start, attribute in get_livelihood_activity_regexes():
278+
match = pattern.fullmatch(label)
279+
if match:
280+
attributes.update(match.groupdict())
281+
attributes["activity_label"] = label
282+
attributes["strategy_type"] = strategy_type
283+
attributes["is_start"] = is_start
284+
if isinstance(attribute, dict):
285+
# Attribute contains a dict of attributes, e.g. notes, etc.
286+
attributes.update(attribute)
287+
else:
288+
# Attribute is a string containing the attribute name
289+
attributes["attribute"] = attribute
290+
# Save the matched pattern to aid trouble-shooting
291+
attributes["notes"] = (
292+
attributes["notes"] + " " + f' r"{pattern.pattern}"'
293+
if attributes["notes"]
294+
else f'r"{pattern.pattern}"'
295+
)
296+
# Return the first matching pattern
297+
return attributes
298+
299+
# Didn't match any patterns, so return empty attributes
300+
return attributes
301+
302+
258303
@functools.cache
259304
def get_livelihood_activity_label_map(activity_type: str) -> dict[str, dict]:
260305
"""
@@ -306,40 +351,9 @@ def get_label_attributes(label: str, activity_type: str) -> pd.Series:
306351
try:
307352
return pd.Series(get_livelihood_activity_label_map(activity_type)[label])
308353
except KeyError:
309-
# No entry in the ActivityLabel model for this label, so attempt to match the label against the regexes
310-
attributes = {
311-
"activity_label": None,
312-
"strategy_type": None,
313-
"is_start": None,
314-
"product_id": None,
315-
"unit_of_measure_id": None,
316-
"season": None,
317-
"additional_identifier": None,
318-
"attribute": None,
319-
"notes": None,
320-
}
321-
for pattern, strategy_type, is_start, attribute in get_livelihood_activity_regexes():
322-
match = pattern.fullmatch(label)
323-
if match:
324-
attributes.update(match.groupdict())
325-
attributes["activity_label"] = label
326-
attributes["strategy_type"] = strategy_type
327-
attributes["is_start"] = is_start
328-
if isinstance(attribute, dict):
329-
# Attribute contains a dict of attributes, e.g. notes, etc.
330-
attributes.update(attribute)
331-
else:
332-
# Attribute is a string containing the attribute name
333-
attributes["attribute"] = attribute
334-
# Save the matched pattern to aid trouble-shooting
335-
attributes["notes"] = (
336-
attributes["notes"] + " " + f' r"{pattern.pattern}"'
337-
if attributes["notes"]
338-
else f'r"{pattern.pattern}"'
339-
)
340-
return pd.Series(attributes)
341-
# No pattern matched
342-
return pd.Series(attributes).fillna(pd.NA)
354+
# No entry in the ActivityLabel model instance for this label, so attempt to match against the regexes
355+
attributes = get_livelihood_activity_regular_expression_attributes(label)
356+
return pd.Series(attributes)
343357

344358

345359
def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code: str | None) -> pd.DataFrame:
@@ -385,8 +399,91 @@ def get_all_label_attributes(labels: pd.Series, activity_type: str, country_code
385399
return all_label_attributes
386400

387401

402+
@asset
403+
def livelihood_activity_label_recognition_dataframe(
404+
context: AssetExecutionContext,
405+
config: BSSMetadataConfig,
406+
all_livelihood_activity_labels_dataframe: pd.DataFrame,
407+
all_other_cash_income_labels_dataframe: pd.DataFrame,
408+
all_wild_foods_labels_dataframe: pd.DataFrame,
409+
all_livelihood_summary_labels_dataframe: pd.DataFrame,
410+
):
411+
"""
412+
A saved spreadsheet showing how each BSS label is recognized, either from the ActivityLabel model or a regex.
413+
"""
414+
# Path to the output spreadsheet
415+
p = UPath(config.bss_label_recognition_workbook, **config.bss_label_recognition_storage_options)
416+
417+
all_livelihood_activity_labels_dataframe["activity_type"] = (
418+
ActivityLabel.LivelihoodActivityType.LIVELIHOOD_ACTIVITY
419+
)
420+
all_other_cash_income_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.OTHER_CASH_INCOME
421+
all_wild_foods_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.WILD_FOODS
422+
all_livelihood_summary_labels_dataframe["activity_type"] = ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY
423+
424+
# Build a dataframe of all the Activity Labels from all BSSs
425+
all_labels_df = pd.concat(
426+
[
427+
all_livelihood_activity_labels_dataframe,
428+
all_other_cash_income_labels_dataframe,
429+
all_wild_foods_labels_dataframe,
430+
all_livelihood_summary_labels_dataframe,
431+
],
432+
ignore_index=True,
433+
)
434+
435+
# Add the regular expressions
436+
regex_attributes_df = pd.DataFrame.from_records(
437+
all_labels_df["label"].astype(str).map(get_livelihood_activity_regular_expression_attributes)
438+
)
439+
all_labels_df = all_labels_df.join(
440+
regex_attributes_df,
441+
how="left",
442+
)
443+
444+
# Add the labels from the database
445+
db_labels_df = pd.DataFrame.from_records(
446+
ActivityLabel.objects.annotate(label_lower=Lower("activity_label")).values(
447+
"label_lower",
448+
"activity_type",
449+
"status",
450+
"strategy_type",
451+
"is_start",
452+
"product_id",
453+
"unit_of_measure_id",
454+
"currency_id",
455+
"season",
456+
"additional_identifier",
457+
"attribute",
458+
"notes",
459+
)
460+
)
461+
all_labels_df = all_labels_df.join(
462+
db_labels_df.set_index(["label_lower", "activity_type"]),
463+
on=("label_lower", "activity_type"),
464+
how="left",
465+
rsuffix="_db",
466+
lsuffix="_regex",
467+
)
468+
469+
# GDriveFS doesn't support updating existing files, it always create a new file with same name.
470+
# This leads to multiple files with the same name in the folder, so we delete any existing files first.
471+
if p.exists():
472+
# @TODO This doesn't work with the current version of gdrivefs, possibly because of an error
473+
# with accessing Shared Drives. For now, we need to manually delete the old files before running
474+
# the asset again.
475+
# We need to experiment and possibly create a custom gdrivefs that reuses code from KiLuigi's GoogleDriveTarget
476+
p.unlink()
477+
478+
# Save the dataframe to an Excel workbook
479+
with p.fs.open(p.path, mode="wb") as f:
480+
with pd.ExcelWriter(f, engine="openpyxl") as writer:
481+
all_labels_df.to_excel(writer, index=False, sheet_name="All Labels")
482+
483+
388484
def get_instances_from_dataframe(
389485
context: AssetExecutionContext,
486+
config: BSSMetadataConfig,
390487
df: pd.DataFrame,
391488
livelihood_zone_baseline: LivelihoodZoneBaseline,
392489
activity_type: str,
@@ -435,10 +532,14 @@ def get_instances_from_dataframe(
435532
)
436533

437534
# Check that we recognize all of the activity labels
535+
# The unrecognized labels are rows after the header rows where column A is not blank,
536+
# but the matching row in all_label_attributes dataframe has a blank activity_label.
537+
# Group the resulting dataframe so that we have a label and a list of the rows where it occurs.
438538
allow_unrecognized_labels = True
439539
unrecognized_labels = (
440540
df.iloc[num_header_rows:][
441-
(df["A"].iloc[num_header_rows:] != "") & (all_label_attributes.iloc[num_header_rows:, 0].isna())
541+
(df["A"].iloc[num_header_rows:] != "")
542+
& (all_label_attributes.iloc[num_header_rows:]["activity_label"] == "")
442543
]
443544
.groupby("A")
444545
.apply(lambda x: ", ".join(x.index.astype(str)), include_groups=False)
@@ -727,8 +828,9 @@ def get_instances_from_dataframe(
727828
for i, livelihood_activity in enumerate(livelihood_activities_for_strategy):
728829
livelihood_activity["livelihood_strategy"] = livelihood_zone_baseline_key + [
729830
livelihood_strategy["strategy_type"],
730-
livelihood_strategy["season"] if livelihood_strategy["season"] else "",
731-
livelihood_strategy["product_id"] if livelihood_strategy["product_id"] else "",
831+
livelihood_strategy["season"] or "", # Natural key components must be "" rather than None
832+
livelihood_strategy["product_id"]
833+
or "", # Natural key components must be "" rather than None
732834
livelihood_strategy["additional_identifier"],
733835
]
734836

@@ -1149,13 +1251,6 @@ def get_instances_from_dataframe(
11491251
% (partition_key, worksheet_name, row, label)
11501252
) from e
11511253

1152-
raise_errors = True
1153-
if errors and raise_errors:
1154-
errors = "\n".join(errors)
1155-
raise RuntimeError(
1156-
"Missing or inconsistent metadata in BSS %s worksheet '%s':\n%s" % (partition_key, worksheet_name, errors)
1157-
)
1158-
11591254
result = {
11601255
"LivelihoodStrategy": livelihood_strategies,
11611256
"LivelihoodActivity": livelihood_activities,
@@ -1177,6 +1272,19 @@ def get_instances_from_dataframe(
11771272
if not unrecognized_labels.empty:
11781273
metadata["unrecognized_labels"] = MetadataValue.md(unrecognized_labels.to_markdown(index=False))
11791274

1275+
if errors:
1276+
if config.strict:
1277+
raise RuntimeError(
1278+
"Missing or inconsistent metadata in BSS %s worksheet '%s':\n%s"
1279+
% (partition_key, worksheet_name, "\n".join(errors))
1280+
)
1281+
else:
1282+
context.log.error(
1283+
"Missing or inconsistent metadata in BSS %s worksheet '%s':\n%s"
1284+
% (partition_key, worksheet_name, "\n".join(errors))
1285+
)
1286+
metadata["errors"] = MetadataValue.md(f'```text\n{"\n".join(errors)}\n```')
1287+
11801288
return Output(
11811289
result,
11821290
metadata=metadata,
@@ -1185,6 +1293,7 @@ def get_instances_from_dataframe(
11851293

11861294
def get_annotated_instances_from_dataframe(
11871295
context: AssetExecutionContext,
1296+
config: BSSMetadataConfig,
11881297
livelihood_activity_dataframe: pd.DataFrame,
11891298
livelihood_summary_dataframe: pd.DataFrame,
11901299
activity_type: str,
@@ -1203,6 +1312,7 @@ def get_annotated_instances_from_dataframe(
12031312
# Get the detail LivelihoodStrategy and LivelihoodActivity instances
12041313
output = get_instances_from_dataframe(
12051314
context,
1315+
config,
12061316
livelihood_activity_dataframe,
12071317
livelihood_zone_baseline,
12081318
activity_type,
@@ -1214,6 +1324,7 @@ def get_annotated_instances_from_dataframe(
12141324
# Get the summary instances
12151325
reported_summary_output = get_instances_from_dataframe(
12161326
context,
1327+
config,
12171328
livelihood_summary_dataframe,
12181329
livelihood_zone_baseline,
12191330
ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY,
@@ -1325,7 +1436,9 @@ def get_annotated_instances_from_dataframe(
13251436
summary_df.replace(pd.NA, None).to_markdown(floatfmt=",.0f")
13261437
)
13271438

1328-
# Move the preview and metadata item to the end of the dict
1439+
# Move the preview and errors metadata item to the end of the dict
1440+
if "errors" in output.metadata:
1441+
output.metadata["errors"] = output.metadata.pop("errors")
13291442
output.metadata["preview"] = output.metadata.pop("preview")
13301443

13311444
return output
@@ -1334,26 +1447,27 @@ def get_annotated_instances_from_dataframe(
13341447
@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager")
13351448
def livelihood_activity_instances(
13361449
context: AssetExecutionContext,
1450+
config: BSSMetadataConfig,
13371451
livelihood_activity_dataframe: pd.DataFrame,
13381452
livelihood_summary_dataframe: pd.DataFrame,
13391453
) -> Output[dict]:
13401454
"""
13411455
LivelhoodStrategy and LivelihoodActivity instances extracted from the BSS.
13421456
"""
1343-
output = get_annotated_instances_from_dataframe(
1457+
return get_annotated_instances_from_dataframe(
13441458
context,
1459+
config,
13451460
livelihood_activity_dataframe,
13461461
livelihood_summary_dataframe,
1347-
ActivityLabel.LivelihoodActivityType.LIVELIHOOD_SUMMARY,
1462+
ActivityLabel.LivelihoodActivityType.LIVELIHOOD_ACTIVITY,
13481463
len(HEADER_ROWS),
13491464
)
13501465

1351-
return output
1352-
13531466

13541467
@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager")
13551468
def livelihood_activity_valid_instances(
13561469
context: AssetExecutionContext,
1470+
config: BSSMetadataConfig,
13571471
livelihood_activity_instances: dict,
13581472
wealth_characteristic_instances: dict,
13591473
) -> Output[dict]:
@@ -1369,16 +1483,7 @@ def livelihood_activity_valid_instances(
13691483
**{"WealthGroup": wealth_characteristic_instances["WealthGroup"]},
13701484
**livelihood_activity_instances,
13711485
}
1372-
valid_instances, metadata = validate_instances(context, livelihood_activity_instances, partition_key)
1373-
metadata = {f"num_{key.lower()}": len(value) for key, value in valid_instances.items()}
1374-
metadata["total_instances"] = sum(len(value) for value in valid_instances.values())
1375-
metadata["preview"] = MetadataValue.md(
1376-
f"```json\n{json.dumps(valid_instances, indent=4, ensure_ascii=False)}\n```"
1377-
)
1378-
return Output(
1379-
valid_instances,
1380-
metadata=metadata,
1381-
)
1486+
return validate_instances(context, config, livelihood_activity_instances, partition_key)
13821487

13831488

13841489
@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager")
@@ -1390,11 +1495,7 @@ def livelihood_activity_fixture(
13901495
"""
13911496
Django fixture for the Livelihood Activities from a BSS.
13921497
"""
1393-
fixture, metadata = get_fixture_from_instances(livelihood_activity_valid_instances)
1394-
return Output(
1395-
fixture,
1396-
metadata=metadata,
1397-
)
1498+
return get_fixture_from_instances(livelihood_activity_valid_instances)
13981499

13991500

14001501
@asset(partitions_def=bss_instances_partitions_def)
@@ -1405,8 +1506,4 @@ def imported_livelihood_activities(
14051506
"""
14061507
Imported Django fixtures for a BSS, added to the Django database.
14071508
"""
1408-
metadata = import_fixture(livelihood_activity_fixture)
1409-
return Output(
1410-
None,
1411-
metadata=metadata,
1412-
)
1509+
return import_fixture(livelihood_activity_fixture)

0 commit comments

Comments
 (0)