Skip to content

Commit 87f8895

Browse files
committed
Print matching status less frequently
Bucketing every 5% by default instead of row-by-row
1 parent 80aa07a commit 87f8895

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

src/server/pipeline/match_data.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ def start(connection, added_or_updated_rows):
1010
# Assigns matching ID's to records, as well.
1111
# WARNING: not thread-safe and could lead to concurrency issues if two users /execute simultaneously
1212
current_app.logger.info('Start record matching')
13-
current_app.logger.warning('Matching updated records not yet handled')
1413
# Will need to consider updating the existing row contents (filter by active), deactivate,
1514
# try to match, and merge previous matching groups if applicable
1615
items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True)
@@ -29,15 +28,20 @@ def start(connection, added_or_updated_rows):
2928
del row["_id"] # avoid specifying the _id field, so postgres will auto-increment for us
3029

3130
rows = items_to_update.to_dict(orient="records")
31+
row_print_freq = np.floor_divide(len(rows), 20) # approx every 5%
3232
for row_num, row in enumerate(rows):
33-
current_app.logger.info("- Matching row {} of {}".format(row_num+1, len(rows)))
33+
if row_num % row_print_freq == 0:
34+
current_app.logger.info("- Matching rows {}-{} of {}".format(
35+
row_num+1, min(len(rows), row_num+row_print_freq), len(rows))
36+
)
37+
3438
# Exact matches based on specified columns
3539
row_matches = pdp_contacts[
3640
(pdp_contacts["first_name"] == row["first_name"]) &
3741
(pdp_contacts["last_name"] == row["last_name"]) &
3842
(pdp_contacts["email"] == row["email"]) # TODO: could transform this line into an "or" with phone number
3943
]
40-
if row_matches.shape[0] == 0: # new record, no matching rows
44+
if row_matches.empty: # new record, no matching rows
4145
row_group = max_matching_group
4246
max_matching_group += 1
4347
else: # existing match(es)
@@ -51,7 +55,9 @@ def start(connection, added_or_updated_rows):
5155
# Updating local pdp_contacts dataframe instead of a roundtrip to postgres within the loop.
5256
# Indexing by iloc and vector of rows to keep the pd.DataFrame class and avoid implicit
5357
# casting to a single-typed pd.Series.
54-
pdp_contacts = pdp_contacts.append(items_to_update.iloc[[row_num], :])
58+
pdp_contacts = pdp_contacts.append(items_to_update.iloc[[row_num], :], ignore_index=True)
5559

5660
# Write new data and matching ID's to postgres in bulk, instead of line-by-line
61+
current_app.logger.info("- Writing data to pdp_contacts table")
5762
items_to_update.to_sql('pdp_contacts', connection, index=False, if_exists='append')
63+
current_app.logger.info("- Finished load to pdp_contacts table")

0 commit comments

Comments
 (0)