1717from pyspark .ml .tuning import ParamGridBuilder , CrossValidator , CrossValidatorModel
1818from pyspark .sql import DataFrame , SparkSession , Column
1919from pyspark .sql .functions import (to_date , col , lit , length , broadcast , udf , when , substring , lower , concat_ws ,
20- soundex , \
21- regexp_replace , trim , split , struct , arrays_zip , array , array_sort , current_date )
20+ soundex , regexp_replace , trim , split , struct , arrays_zip , array , array_sort ,
21+ current_date )
2222from pyspark .sql .pandas .functions import pandas_udf
2323from pyspark .sql .types import StructType , StructField , StringType , DateType , BooleanType , DoubleType
2424
@@ -79,9 +79,9 @@ def extract_person_name(name: str) -> (str, str, str, str, str):
7979 if not name or any (junk in name .casefold () for junk in junk_data ):
8080 return "Unknown" , None , None , None , None
8181
82- if any (business in name .casefold () for business in common_business_types ) or (
83- any (business in name .casefold () for business in common_business_types_small ) and not any (
84- t in name .casefold () for t in common_titles_subset_with_space )):
82+ if ( any (business in name .casefold () for business in common_business_types ) or (
83+ any (business in name .casefold () for business in common_business_types_small ) and not any (
84+ t in name .casefold () for t in common_titles_subset_with_space ))) :
8585 return "Business" , None , None , None , None
8686
8787 person_title , first_name , middle_name , last_name = None , None , None , None
@@ -114,7 +114,7 @@ def extract_person_name(name: str) -> (str, str, str, str, str):
114114 title_finder = [t for t in title_with_name if t .casefold () in common_titles ]
115115 person_title = " " .join (title_finder ) if len (title_finder ) else None
116116 remaining_name = [n for n in title_with_name if n .casefold () != (
117- person_title or "" ).casefold () and n .casefold () not in common_titles and n .casefold () not in ["." , "&" ]]
117+ person_title or "" ).casefold () and n .casefold () not in common_titles and n .casefold () not in ["." , "&" ]]
118118
119119 if len (remaining_name ) == 1 :
120120 first_name = remaining_name [0 ]
@@ -259,7 +259,7 @@ def prepare_clean_housing_data(person_reshape: DataFrame, assets_reshape: DataFr
259259 A prepared and cleaned dataframe containing housing tenancy data.
260260 """
261261 tenure_reshape = tenure_reshape .filter ((tenure_reshape ["endoftenuredate" ].isNull ()) | (
262- tenure_reshape ["endoftenuredate" ].cast (DateType ()) > current_date ()))
262+ tenure_reshape ["endoftenuredate" ].cast (DateType ()) > current_date ()))
263263
264264 assets_reshape = assets_reshape .filter (assets_reshape ['assettype' ] == 'Dwelling' )
265265
@@ -538,12 +538,12 @@ def prepare_clean_housing_benefit_data(hb_member_df: DataFrame, hb_household_df:
538538 housing_benefit_rent_assessment = hb_rent_assessment_df .withColumn ("source_filter" , when (
539539 (col ("dhp_ind" ) == 1 ) & (col ("type_ind" ) > 1 ), "DHP" ).otherwise ("HB" )).filter (
540540 (col ("from_date" ) < col ("import_date" )) & (col ("to_date" ) > col ("import_date" )) & (
541- (col ("type_ind" ) == 1 ) | (col ("dhp_ind" ) == 1 )) & (col ("model_amt" ) > 0 )).select (col ("claim_id" ),
542- col ("source_filter" ))
541+ (col ("type_ind" ) == 1 ) | (col ("dhp_ind" ) == 1 )) & (col ("model_amt" ) > 0 )).select (col ("claim_id" ),
542+ col ("source_filter" ))
543543
544544 housing_benefit_ctax_assessment = hb_ctax_assessment_df .withColumn ("source_filter" , lit ("CTS" )).filter (
545545 (col ("from_date" ) < col ("import_date" )) & (col ("to_date" ) > col ("import_date" )) & (col ("model_amt" ) > 0 ) & (
546- (col ("type_ind" ) == 1 ) | (col ("dhp_ind" ) == 1 ))).select (col ("claim_id" ), col ("source_filter" ))
546+ (col ("type_ind" ) == 1 ) | (col ("dhp_ind" ) == 1 ))).select (col ("claim_id" ), col ("source_filter" ))
547547
548548 housing_benefit_rent_ctax = housing_benefit_rent_assessment .union (housing_benefit_ctax_assessment )
549549
@@ -1106,7 +1106,7 @@ def remove_deceased(df: DataFrame) -> DataFrame:
11061106 A DataFrame after removing all the deceased persons.
11071107 """
11081108 deceased_filter_cond = (
1109- lower (col ("title" )).contains ("(deceased)" ) | lower (col ("title" )).contains ("executor" ) | lower (
1109+ lower (col ("title" )).contains ("(deceased)" ) | lower (col ("title" )).contains ("executor" ) | lower (
11101110 col ("title" )).contains ("exor" ) | lower (col ("title" )).contains ("rep" ) | lower (col ("title" )).contains (
11111111 " of" ) | lower (col ("title" )).contains ("of " ) | lower (col ("title" )).contains ("the" ) | lower (
11121112 col ("title" )).contains ("pe" ) | lower (col ("title" )).contains ("other" ))
@@ -1140,8 +1140,8 @@ def generate_possible_matches(df: DataFrame) -> DataFrame:
11401140 col ("last_name_soundex" ))
11411141
11421142 return df_a .join (df_b , (df_a ["a_source_id" ] != df_b ["b_source_id" ]) & (
1143- df_a ["first_name_soundex" ] == df_b ["first_name_soundex" ]) & (
1144- df_a ["last_name_soundex" ] == df_b ["last_name_soundex" ])).drop (
1143+ df_a ["first_name_soundex" ] == df_b ["first_name_soundex" ]) & (
1144+ df_a ["last_name_soundex" ] == df_b ["last_name_soundex" ])).drop (
11451145 * ["first_name_soundex" , "last_name_soundex" ])
11461146
11471147
@@ -1156,9 +1156,9 @@ def automatically_label_data(df: DataFrame) -> DataFrame:
11561156 A DataFrame with column auto_labels.
11571157 """
11581158 return df .withColumn ("auto_labels" , when ((col ("a_source_id" ) == col ("b_source_id" )) | (
1159- (col ("a_first_name" ) == col ("b_first_name" )) & (col ("a_last_name" ) == col ("b_last_name" )) & (
1160- col ("a_date_of_birth" ) == col ("b_date_of_birth" )) & (col ("a_uprn" ) == col ("b_uprn" )) & (
1161- col ("a_post_code" ) == col ("b_post_code" ))), lit (True )).otherwise (lit (None ).cast (BooleanType ())))
1159+ (col ("a_first_name" ) == col ("b_first_name" )) & (col ("a_last_name" ) == col ("b_last_name" )) & (
1160+ col ("a_date_of_birth" ) == col ("b_date_of_birth" )) & (col ("a_uprn" ) == col ("b_uprn" )) & (
1161+ col ("a_post_code" ) == col ("b_post_code" ))), lit (True )).otherwise (lit (None ).cast (BooleanType ())))
11621162
11631163
11641164@pandas_udf (features_schema )
@@ -1429,13 +1429,13 @@ def link_all_matched_persons(standard_df: DataFrame, predicted_df: DataFrame) ->
14291429
14301430 # Extra analysis (for analyst only): if you need to do.
14311431
1432- # To find how many connection are there # person_graph.inDegrees.filter(col("inDegree") > 1).orderBy(col(
1432+ # To find how many connection are there # person_graph.inDegrees.filter(col("inDegree") > 1).orderBy(col( # #
14331433 # "inDegree").desc()).show(truncate=False)
14341434
14351435 # Graph query using motif to find where person 'a' is connected to person 'b', and person 'b' is also connected
14361436 # to # person 'a' # motif = person_graph.find("(a)-[]->(b); (b)-[]->(a)") # motif.show(truncate=False)
14371437
1438- # To count number of triangles i.e. a connected to b, b connected to c and c is connected back to a #
1438+ # To count number of triangles i.e. a connected to b, b connected to c and c is connected back to a # # # #
14391439 # triangle_count = person_graph.triangleCount() # triangle_count.orderBy(col("count").desc()).show(n=10,
14401440 # truncate=False)
14411441
0 commit comments