@@ -39,8 +39,18 @@ def start(connection, added_or_updated_rows):
39
39
items_to_update ["archived_date" ] = np .nan
40
40
items_to_update ["created_date" ] = datetime .datetime .now ()
41
41
42
+ # Create Normalized columns for matching
43
+ items_to_update ["first_name_normalized" ] = items_to_update ["first_name" ].apply (normalize_before_match )
44
+ items_to_update ["last_name_normalized" ] = items_to_update ["last_name" ].apply (normalize_before_match )
45
+ items_to_update ["email_normalized" ] = items_to_update ["email" ].apply (normalize_before_match )
46
+
47
+ pdp_contacts ["first_name_normalized" ] = pdp_contacts ["first_name" ].apply (normalize_before_match )
48
+ pdp_contacts ["last_name_normalized" ] = pdp_contacts ["last_name" ].apply (normalize_before_match )
49
+ pdp_contacts ["email_normalized" ] = pdp_contacts ["email" ].apply (normalize_before_match )
50
+
42
51
rows = items_to_update .to_dict (orient = "records" )
43
52
row_print_freq = max (1 , np .floor_divide (len (rows ), 20 )) # approx every 5% (or every row if small)
53
+
44
54
for row_num , row in enumerate (rows ):
45
55
if row_num % row_print_freq == 0 :
46
56
current_app .logger .info ("- Matching rows {}-{} of {}" .format (
@@ -51,24 +61,15 @@ def start(connection, added_or_updated_rows):
51
61
})
52
62
53
63
# Exact matches based on specified columns
54
-
55
64
row_matches = pdp_contacts [
56
65
(
57
- ((pdp_contacts ["first_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
58
- row ["first_name" ])) &
59
- (pdp_contacts ["last_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
60
- row ["last_name" ])))
66
+ ((pdp_contacts ["first_name_normalized" ] == row ["first_name_normalized" ]) &
67
+ (pdp_contacts ["last_name_normalized" ] == row ["last_name_normalized" ]))
61
68
|
62
- ((pdp_contacts ["first_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
63
- row [
64
- "last_name" ])) &
65
- (pdp_contacts ["last_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
66
- row [
67
- "first_name" ])))
69
+ ((pdp_contacts ["first_name_normalized" ] == row ["last_name_normalized" ]) &
70
+ (pdp_contacts ["last_name_normalized" ] == row ["first_name_normalized" ]))
68
71
&
69
- ((pdp_contacts ["email" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
70
- row ["email" ])) | (
71
- pdp_contacts ["mobile" ] == row ["mobile" ]))
72
+ ((pdp_contacts ["email_normalized" ] == row ["email_normalized" ]) | (pdp_contacts ["mobile" ] == row ["mobile" ]))
72
73
)
73
74
]
74
75
if row_matches .empty : # new record, no matching rows
@@ -89,6 +90,8 @@ def start(connection, added_or_updated_rows):
89
90
90
91
# Write new data and matching ID's to postgres in bulk, instead of line-by-line
91
92
current_app .logger .info ("- Writing data to pdp_contacts table" )
93
+ items_to_update = items_to_update .drop (
94
+ columns = ["first_name_normalized" , "last_name_normalized" , "email_normalized" ])
92
95
items_to_update .to_sql ('pdp_contacts' , connection , index = False , if_exists = 'append' )
93
96
current_app .logger .info ("- Finished load to pdp_contacts table" )
94
97
0 commit comments