diff --git a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/01-SDP-Smart-Claims.py b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/01-SDP-Smart-Claims.py index c6fff198..083ef570 100644 --- a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/01-SDP-Smart-Claims.py +++ b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/01-SDP-Smart-Claims.py @@ -122,4 +122,4 @@ # MAGIC # MAGIC The gold layer also includes geolocation enrichment using the open-source geopy library to add latitude/longitude coordinates to policy addresses, providing better visualization capabilities for Claims Investigators. # MAGIC -# MAGIC Go back to the [00-Smart-Claims-Introduction]($../../00-Smart-Claims-Introduction) to explore AI and Data Visualization. +# MAGIC Go back to the [00-Smart-Claims-Introduction]($../00-Smart-Claims-Introduction) to explore AI and Data Visualization. diff --git a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/01-bronze.py b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/01-bronze.py index 35bd9801..7d82f0bc 100644 --- a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/01-bronze.py +++ b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/01-bronze.py @@ -1,9 +1,6 @@ from pyspark import pipelines as dp from pyspark.sql import functions as F - -catalog = "main__build" -schema = dbName = db = "dbdemos_fsi_smart_claims" -volume_name = "volume_claims" +from utilities.utils import catalog, db, volume_name # ---------------------------------- # Ingest raw claims data from JSON files diff --git a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/02-silver.py b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/02-silver.py index eb1905f7..4e8cd5c8 100644 --- a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/02-silver.py +++ b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/transformations/02-silver.py @@ -2,15 +2,7 @@ from pyspark.sql import functions as F from pyspark.sql.functions import lit, concat, col from pyspark.sql import types as T - -# Helper function to flatten nested struct fields -def flatten_struct(df): - for field in df.schema.fields: - if isinstance(field.dataType, T.StructType): - for child in field.dataType: - df = df.withColumn(field.name + '_' + child.name, F.col(field.name + '.' + child.name)) - df = df.drop(field.name) - return df +from utilities import utils # ---------------------------------- # Aggregate telematics data by chassis number @@ -57,7 +49,7 @@ def policy(): def claim(): # Read the staged claim records into memory claim = spark.readStream.table("raw_claim") - claim = flatten_struct(claim) + claim = utils.flatten_struct(claim) # Update the format of all date/time features return (claim.withColumn("claim_date", F.to_date(F.col("claim_date"))) diff --git a/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/utilities/utils.py b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/utilities/utils.py new file mode 100644 index 00000000..8cb2072b --- /dev/null +++ b/demo-FSI/lakehouse-fsi-smart-claims/01-Data-Ingestion/utilities/utils.py @@ -0,0 +1,15 @@ +from pyspark.sql import functions as F +from pyspark.sql.functions import col, lit, concat +from pyspark.sql import types as T + +catalog = "main__build" +schema = dbName = db = "dbdemos_fsi_smart_claims" +volume_name = "volume_claims" + +def flatten_struct(df): + for field in df.schema.fields: + if isinstance(field.dataType, T.StructType): + for child in field.dataType: + df = df.withColumn(field.name + '_' + child.name, F.col(field.name + '.' + child.name)) + df = df.drop(field.name) + return df diff --git a/demo-FSI/lakehouse-fsi-smart-claims/_resources/bundle_config.py b/demo-FSI/lakehouse-fsi-smart-claims/_resources/bundle_config.py index 0a97b874..e5cc89fd 100644 --- a/demo-FSI/lakehouse-fsi-smart-claims/_resources/bundle_config.py +++ b/demo-FSI/lakehouse-fsi-smart-claims/_resources/bundle_config.py @@ -81,6 +81,14 @@ "title": "SDP - Gold transformations", "description": "Gold layer transformations." }, + { + "path": "01-Data-Ingestion/utilities/utils.py", + "pre_run": False, + "publish_on_website": True, + "add_cluster_setup_cell": False, + "title": "SDP - Gold transformations", + "description": "Gold layer transformations." + }, { "path": "02-Data-Science-ML/02.1-Model-Training", "pre_run": True,