Skip to content

Commit 4681add

Browse files
authored
Merge pull request #275 from CodeForPhilly/manual_matches
Manual matches
2 parents 70b5530 + 2a35e79 commit 4681add

File tree

4 files changed

+29
-14
lines changed

4 files changed

+29
-14
lines changed

src/server/datasource_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ def __clean_csv_headers(header):
2121
'Account ID (18 digit)',
2222
'Opportunity Name', 'Stage', 'Fiscal Period', 'Amount', 'Probability (%)', 'Age',
2323
'Close Date', 'Created Date', 'Type', 'Primary Campaign Source',
24-
'Source', 'Contact ID (18 Digit)', 'Primary Contact']
24+
'Source', 'Contact ID (18 Digit)', 'Primary Contact'],
25+
'manualmatches': ['salesforcecontacts', 'volgistics', 'shelterluvpeople']
2526
}
2627

2728
DATASOURCE_MAPPING = {

src/server/pipeline/clean_and_load_data.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@
1313
def start(connection, pdp_contacts_df, file_path_list):
1414
result = pd.DataFrame(columns=pdp_contacts_df.columns)
1515
json_rows = pd.DataFrame(columns=["source_type", "source_id", "json"])
16-
16+
manual_matches_df = None
17+
1718
for uploaded_file in file_path_list:
1819
file_path = os.path.join(CURRENT_SOURCE_FILES_PATH, uploaded_file)
1920
table_name = file_path.split('/')[-1].split('-')[0]
21+
if table_name == 'manualmatches':
22+
manual_matches_df = pd.read_csv((io.BytesIO(open(file_path, "rb").read())), encoding='iso-8859-1')
23+
manual_matches_df[["volgistics", "shelterluvpeople"]] = manual_matches_df[["volgistics", "shelterluvpeople"]].fillna(0).astype(int).astype(str)
24+
continue
25+
2026
current_app.logger.info('Running load_paws_data on: ' + uploaded_file)
2127

2228
df = pd.read_csv((io.BytesIO(open(file_path, "rb").read())), encoding='iso-8859-1')
@@ -54,7 +60,7 @@ def start(connection, pdp_contacts_df, file_path_list):
5460

5561
current_app.logger.info(' - Finish load_paws_data on: ' + uploaded_file)
5662

57-
return result, json_rows
63+
return result, json_rows, manual_matches_df
5864

5965

6066
def create_normalized_df(df, normalized_df, table_name):

src/server/pipeline/flow_script.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def start_flow():
2525
# Populate new records in secondary tables (donations, volunteer shifts)
2626
# input - existing files in path
2727
# output - normalized object of all entries, as well as the input json rows for primary sources
28-
normalized_data, source_json = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
28+
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
2929

3030
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
3131
# (If additional inconsistencies are encountered, may need to enforce the schema of
@@ -41,7 +41,7 @@ def start_flow():
4141

4242
# Match new+updated records against previous version of pdp_contacts database, and
4343
# write these rows to the database.
44-
match_data.start(connection, rows_classified)
44+
match_data.start(connection, rows_classified, manual_matches_df)
4545

4646
# Copy raw input rows to json fields in pdp_contacts,
4747
# using a temporary table to simplify the update code.

src/server/pipeline/match_data.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@ def normalize_before_match(value):
1515
return result
1616

1717

18-
def start(connection, added_or_updated_rows):
18+
def start(connection, added_or_updated_rows, manual_matches_df):
1919
# Match new records to each other and existing pdp_contacts data.
2020
# Assigns matching ID's to records, as well.
2121
# WARNING: not thread-safe and could lead to concurrency issues if two users /execute simultaneously
2222
current_app.logger.info('Start record matching')
2323
# Will need to consider updating the existing row contents (filter by active), deactivate,
2424
# try to match, and merge previous matching groups if applicable
25-
2625
job_id = str(int(time.time()))
2726
log_db.log_exec_status(job_id, {'status': 'starting', 'at_row': 0, 'of_rows': 0})
2827
current_app.logger.info("***** Running execute job ID " + job_id + " *****")
@@ -63,15 +62,24 @@ def start(connection, added_or_updated_rows):
6362
# Exact matches based on specified columns
6463
row_matches = pdp_contacts[
6564
(
66-
((pdp_contacts["first_name_normalized"] == row["first_name_normalized"]) &
67-
(pdp_contacts["last_name_normalized"] == row["last_name_normalized"]))
68-
|
69-
((pdp_contacts["first_name_normalized"] == row["last_name_normalized"]) &
70-
(pdp_contacts["last_name_normalized"] == row["first_name_normalized"]))
71-
&
72-
((pdp_contacts["email_normalized"] == row["email_normalized"]) | (pdp_contacts["mobile"] == row["mobile"]))
65+
((pdp_contacts["first_name_normalized"] == row["first_name_normalized"]) &
66+
(pdp_contacts["last_name_normalized"] == row["last_name_normalized"]))
67+
|
68+
((pdp_contacts["first_name_normalized"] == row["last_name_normalized"]) &
69+
(pdp_contacts["last_name_normalized"] == row["first_name_normalized"]))
70+
&
71+
((pdp_contacts["email_normalized"] == row["email_normalized"]) | (pdp_contacts["mobile"] == row["mobile"]))
7372
)
7473
]
74+
#collect other linked ids from manual matches source
75+
if manual_matches_df != None:
76+
linked_ids = manual_matches_df[(manual_matches_df[row["source_type"]] == row["source_id"])]
77+
ids = linked_ids.to_dict(orient="records")
78+
for row_dict in enumerate(ids):
79+
for column, value in row_dict.items():
80+
row_matches = row_matches.append(pdp_contacts[(pdp_contacts["source_type"] == column) & (pdp_contacts["source_id"] == value)])
81+
82+
7583
if row_matches.empty: # new record, no matching rows
7684
max_matching_group += 1
7785
row_group = max_matching_group

0 commit comments

Comments
 (0)