Skip to content

Commit 10cb8fd

Browse files
authored
Merge pull request #167 from CodeForPhilly/faster-matching
Faster matching
2 parents aa85a49 + a16d144 commit 10cb8fd

File tree

1 file changed

+31
-29
lines changed

1 file changed

+31
-29
lines changed

src/server/pipeline/match_data.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,11 @@
55
from flask import current_app
66

77

8-
# todo: match and load
9-
# Compare each new and updated item to all records in the DB
10-
# (including all other items that are new and updated this iteration) - for each item:
11-
# if it matches - it will get the same matching id as the match
12-
# if it doesn't - generate matching id (some prefix with increment?)
13-
# load it with created_at = now and archived_at = null
14-
158
def start(connection, added_or_updated_rows):
169
# Match new records to each other and existing pdp_contacts data.
1710
# Assigns matching ID's to records, as well.
1811
# WARNING: not thread-safe and could lead to concurrency issues if two users /execute simultaneously
1912
current_app.logger.info('Start record matching')
20-
current_app.logger.warning('Matching updated records not yet handled')
2113
# Will need to consider updating the existing row contents (filter by active), deactivate,
2214
# try to match, and merge previous matching groups if applicable
2315
items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True)
@@ -26,36 +18,46 @@ def start(connection, added_or_updated_rows):
2618
if pdp_contacts["matching_id"].dropna().size == 0:
2719
max_matching_group = 0
2820
else:
29-
max_matching_group = max(pdp_contacts["matching_id"].dropna()) + 1
21+
max_matching_group = max(pdp_contacts["matching_id"].dropna())
3022

31-
# Iterate over the dataframe using integer index location,
32-
# because iterrows returns a type-inconsistent series, and itertuples would be more complex.
33-
num_added_or_updated = items_to_update.shape[0]
34-
for row_num in range(num_added_or_updated):
35-
current_app.logger.info("- Matching row {} of {}".format(row_num+1, num_added_or_updated))
36-
row = items_to_update.iloc[[row_num], :].copy() # pd.DataFrame
23+
# Initialize column metadata we'll write to pdp_contacts
24+
items_to_update["matching_id"] = 0 # initializing an int and overwrite in the loop
25+
items_to_update["archived_date"] = np.nan
26+
items_to_update["created_date"] = datetime.datetime.now()
27+
if "_id" in items_to_update.columns:
28+
del row["_id"] # avoid specifying the _id field, so postgres will auto-increment for us
29+
30+
rows = items_to_update.to_dict(orient="records")
31+
row_print_freq = max(1, np.floor_divide(len(rows), 20)) # approx every 5% (or every row if small)
32+
for row_num, row in enumerate(rows):
33+
if row_num % row_print_freq == 0:
34+
current_app.logger.info("- Matching rows {}-{} of {}".format(
35+
row_num+1, min(len(rows), row_num+row_print_freq), len(rows))
36+
)
37+
3738
# Exact matches based on specified columns
3839
row_matches = pdp_contacts[
39-
(pdp_contacts["first_name"] == row["first_name"].values[0]) &
40-
(pdp_contacts["last_name"] == row["last_name"].values[0]) &
41-
(pdp_contacts["email"] == row["email"].values[0])
40+
(pdp_contacts["first_name"] == row["first_name"]) &
41+
(pdp_contacts["last_name"] == row["last_name"]) &
42+
(pdp_contacts["email"] == row["email"]) # TODO: could transform this line into an "or" with phone number
4243
]
43-
if row_matches.shape[0] == 0: # new record, no matching rows
44-
row_group = max_matching_group
44+
if row_matches.empty: # new record, no matching rows
4545
max_matching_group += 1
46+
row_group = max_matching_group
4647
else: # existing match(es)
4748
row_group = row_matches["matching_id"].values[0]
4849
if not all(row_matches["matching_id"] == row_group):
4950
current_app.logger.warning(
5051
"Source {} with ID {} is matching multiple groups in pdp_contacts ({})"
5152
.format(row["source_type"], row["source_id"], str(row_matches["matching_id"].drop_duplicates()))
5253
)
53-
row["created_date"] = datetime.datetime.now()
54-
row["archived_date"] = np.nan
55-
row["matching_id"] = row_group
56-
if "_id" in row.columns:
57-
del row["_id"] # avoid specifying the _id field, so postgres will auto-increment for us
58-
59-
# Round-trip to the database on every loop iteration is inefficient and could be rewritten much faster
60-
row.to_sql('pdp_contacts', connection, index=False, if_exists='append')
61-
pdp_contacts = pd.read_sql_table('pdp_contacts', connection)
54+
items_to_update.loc[row_num, "matching_id"] = row_group
55+
# Updating local pdp_contacts dataframe instead of a roundtrip to postgres within the loop.
56+
# Indexing by iloc and vector of rows to keep the pd.DataFrame class and avoid implicit
57+
# casting to a single-typed pd.Series.
58+
pdp_contacts = pdp_contacts.append(items_to_update.iloc[[row_num], :], ignore_index=True)
59+
60+
# Write new data and matching ID's to postgres in bulk, instead of line-by-line
61+
current_app.logger.info("- Writing data to pdp_contacts table")
62+
items_to_update.to_sql('pdp_contacts', connection, index=False, if_exists='append')
63+
current_app.logger.info("- Finished load to pdp_contacts table")

0 commit comments

Comments
 (0)