6
6
from pipeline import log_db
7
7
8
8
9
+ def normalize_before_match (value ):
10
+ result = None
11
+
12
+ if isinstance (value , str ):
13
+ result = value .lower ()
14
+
15
+ return result
16
+
9
17
10
18
def start (connection , added_or_updated_rows ):
11
19
# Match new records to each other and existing pdp_contacts data.
@@ -16,7 +24,7 @@ def start(connection, added_or_updated_rows):
16
24
# try to match, and merge previous matching groups if applicable
17
25
18
26
job_id = str (int (time .time ()))
19
- log_db .log_exec_status (job_id ,{'status' : 'starting' , 'at_row' : 0 , 'of_rows' :0 })
27
+ log_db .log_exec_status (job_id , {'status' : 'starting' , 'at_row' : 0 , 'of_rows' : 0 })
20
28
current_app .logger .info ("***** Running execute job ID " + job_id + " *****" )
21
29
items_to_update = pd .concat ([added_or_updated_rows ["new" ], added_or_updated_rows ["updated" ]], ignore_index = True )
22
30
pdp_contacts = pd .read_sql_table ('pdp_contacts' , connection )
@@ -36,22 +44,32 @@ def start(connection, added_or_updated_rows):
36
44
for row_num , row in enumerate (rows ):
37
45
if row_num % row_print_freq == 0 :
38
46
current_app .logger .info ("- Matching rows {}-{} of {}" .format (
39
- row_num + 1 , min (len (rows ), row_num + row_print_freq ), len (rows ))
47
+ row_num + 1 , min (len (rows ), row_num + row_print_freq ), len (rows ))
40
48
)
41
- log_db .log_exec_status (job_id ,{'status' : 'executing' , 'at_row' : row_num + 1 , 'of_rows' :len (rows )})
49
+ log_db .log_exec_status (job_id , {
50
+ 'status' : 'executing' , 'at_row' : row_num + 1 , 'of_rows' : len (rows )
51
+ })
42
52
43
53
# Exact matches based on specified columns
54
+
44
55
row_matches = pdp_contacts [
45
56
(
46
- ((pdp_contacts ["first_name" ] == row ["first_name" ]) &
47
- (pdp_contacts ["last_name" ] == row ["last_name" ]))
57
+ ((pdp_contacts ["first_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
58
+ row ["first_name" ])) &
59
+ (pdp_contacts ["last_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
60
+ row ["last_name" ])))
48
61
|
49
- ((pdp_contacts ["first_name" ] == row ["last_name" ]) &
50
- (pdp_contacts ["last_name" ] == row ["first_name" ]))
62
+ ((pdp_contacts ["first_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
63
+ row [
64
+ "last_name" ])) &
65
+ (pdp_contacts ["last_name" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
66
+ row [
67
+ "first_name" ])))
51
68
&
52
- ((pdp_contacts ["email" ] == row ["email" ]) | (pdp_contacts ["mobile" ] == row ["mobile" ]))
69
+ ((pdp_contacts ["email" ].apply (lambda x : normalize_before_match (x )) == normalize_before_match (
70
+ row ["email" ])) | (
71
+ pdp_contacts ["mobile" ] == row ["mobile" ]))
53
72
)
54
-
55
73
]
56
74
if row_matches .empty : # new record, no matching rows
57
75
max_matching_group += 1
@@ -74,4 +92,4 @@ def start(connection, added_or_updated_rows):
74
92
items_to_update .to_sql ('pdp_contacts' , connection , index = False , if_exists = 'append' )
75
93
current_app .logger .info ("- Finished load to pdp_contacts table" )
76
94
77
- log_db .log_exec_status (job_id ,{'status' : 'complete' , 'at_row' : len (rows ), 'of_rows' :len (rows )})
95
+ log_db .log_exec_status (job_id , {'status' : 'complete' , 'at_row' : len (rows ), 'of_rows' : len (rows )})
0 commit comments