Skip to content

Commit 4360780

Browse files
committed
Logs match_data progress to DB
1 parent 644e2e7 commit 4360780

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

src/server/pipeline/log_db.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from datetime import datetime
2+
import json
3+
from sqlalchemy.sql import text
4+
from flask import current_app
5+
6+
from sqlalchemy.dialects.postgresql import insert
7+
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, exc, select
8+
9+
from config import engine
10+
11+
12+
metadata = MetaData()
13+
14+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
15+
16+
17+
18+
def log_exec_status(job_id: str, job_status: dict):
19+
20+
# Write Last Execution stats to DB
21+
# See Alembic Revision ID: 05e0693f8cbb for table definition
22+
with engine.connect() as connection:
23+
ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT
24+
keycol = 'job-' + job_id,
25+
valcol = json.dumps(job_status)
26+
)
27+
28+
# If key already present in DB, do update instead
29+
upsert = ins_stmt.on_conflict_do_update(
30+
constraint='kv_unique_keycol_key',
31+
set_=dict(valcol=json.dumps(job_status))
32+
)
33+
34+
try:
35+
connection.execute(upsert)
36+
except Exception as e:
37+
current_app.logger.error("Insert/Update failed Execution status")
38+
current_app.logger.exception(e)
39+
40+

src/server/pipeline/match_data.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import datetime
1+
import datetime, time
22
import pandas as pd
33
import numpy as np
44

55
from flask import current_app
6+
from pipeline import log_db
7+
68

79

810
def start(connection, added_or_updated_rows):
@@ -12,6 +14,9 @@ def start(connection, added_or_updated_rows):
1214
current_app.logger.info('Start record matching')
1315
# Will need to consider updating the existing row contents (filter by active), deactivate,
1416
# try to match, and merge previous matching groups if applicable
17+
18+
job_id = str(int(time.time()))
19+
log_db.log_exec_status(job_id,{'status': 'starting', 'at_row': 0, 'of_rows':0})
1520
items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True)
1621
pdp_contacts = pd.read_sql_table('pdp_contacts', connection)
1722

@@ -32,6 +37,7 @@ def start(connection, added_or_updated_rows):
3237
current_app.logger.info("- Matching rows {}-{} of {}".format(
3338
row_num+1, min(len(rows), row_num+row_print_freq), len(rows))
3439
)
40+
log_db.log_exec_status(job_id,{'status': 'executing', 'at_row': row_num+1, 'of_rows':len(rows)})
3541

3642
# Exact matches based on specified columns
3743
row_matches = pdp_contacts[
@@ -59,3 +65,5 @@ def start(connection, added_or_updated_rows):
5965
current_app.logger.info("- Writing data to pdp_contacts table")
6066
items_to_update.to_sql('pdp_contacts', connection, index=False, if_exists='append')
6167
current_app.logger.info("- Finished load to pdp_contacts table")
68+
69+
log_db.log_exec_status(job_id,{'status': 'complete', 'at_row': len(rows), 'of_rows':len(rows)})

0 commit comments

Comments
 (0)