Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@
# MAGIC
# MAGIC The gold layer also includes geolocation enrichment using the open-source geopy library to add latitude/longitude coordinates to policy addresses, providing better visualization capabilities for Claims Investigators.
# MAGIC
# MAGIC Go back to the [00-Smart-Claims-Introduction]($../../00-Smart-Claims-Introduction) to explore AI and Data Visualization.
# MAGIC Go back to the [00-Smart-Claims-Introduction]($../00-Smart-Claims-Introduction) to explore AI and Data Visualization.
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from pyspark import pipelines as dp
from pyspark.sql import functions as F

catalog = "main__build"
schema = dbName = db = "dbdemos_fsi_smart_claims"
volume_name = "volume_claims"
from utilities.utils import catalog, db, volume_name

# ----------------------------------
# Ingest raw claims data from JSON files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,7 @@
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, concat, col
from pyspark.sql import types as T

# Helper function to flatten nested struct fields
def flatten_struct(df):
for field in df.schema.fields:
if isinstance(field.dataType, T.StructType):
for child in field.dataType:
df = df.withColumn(field.name + '_' + child.name, F.col(field.name + '.' + child.name))
df = df.drop(field.name)
return df
from utilities import utils

# ----------------------------------
# Aggregate telematics data by chassis number
Expand Down Expand Up @@ -57,7 +49,7 @@ def policy():
def claim():
# Read the staged claim records into memory
claim = spark.readStream.table("raw_claim")
claim = flatten_struct(claim)
claim = utils.flatten_struct(claim)

# Update the format of all date/time features
return (claim.withColumn("claim_date", F.to_date(F.col("claim_date")))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, concat
from pyspark.sql import types as T

catalog = "main__build"
schema = dbName = db = "dbdemos_fsi_smart_claims"
volume_name = "volume_claims"

def flatten_struct(df):
for field in df.schema.fields:
if isinstance(field.dataType, T.StructType):
for child in field.dataType:
df = df.withColumn(field.name + '_' + child.name, F.col(field.name + '.' + child.name))
df = df.drop(field.name)
return df
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@
"title": "SDP - Gold transformations",
"description": "Gold layer transformations."
},
{
"path": "01-Data-Ingestion/utilities/utils.py",
"pre_run": False,
"publish_on_website": True,
"add_cluster_setup_cell": False,
"title": "SDP - Gold transformations",
"description": "Gold layer transformations."
},
{
"path": "02-Data-Science-ML/02.1-Model-Training",
"pre_run": True,
Expand Down