Skip to content

Commit 3c56d62

Browse files
committed
Updated log_exec_status and calls to it
1 parent eadc327 commit 3c56d62

File tree

4 files changed

+38
-19
lines changed

4 files changed

+38
-19
lines changed

src/server/pipeline/clean_and_load_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from flask import current_app
99
import sqlalchemy
1010
from config import CURRENT_SOURCE_FILES_PATH
11-
11+
from pipeline import log_db
1212

1313
def start(connection, pdp_contacts_df, file_path_list):
1414
result = pd.DataFrame(columns=pdp_contacts_df.columns)

src/server/pipeline/flow_script.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import os
2+
import datetime, time
23
import pandas as pd
34
from flask import current_app
4-
from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data
5+
from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data, log_db
56
from config import CURRENT_SOURCE_FILES_PATH
67
from config import engine
78
from models import Base
@@ -10,6 +11,9 @@
1011
def start_flow():
1112
file_path_list = os.listdir(CURRENT_SOURCE_FILES_PATH)
1213

14+
job_id = str(int(time.time()))
15+
log_db.log_exec_status(job_id, 'start_flow', 'executing', '')
16+
1317
if file_path_list:
1418
with engine.connect() as connection:
1519
Base.metadata.create_all(connection)
@@ -25,6 +29,7 @@ def start_flow():
2529
# Populate new records in secondary tables (donations, volunteer shifts)
2630
# input - existing files in path
2731
# output - normalized object of all entries, as well as the input json rows for primary sources
32+
log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '')
2833
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
2934

3035
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
@@ -34,14 +39,15 @@ def start_flow():
3439
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
3540

3641
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
42+
log_db.log_exec_status(job_id, 'classify', 'executing', '')
3743
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
3844

3945
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
4046
archive_rows.archive(connection, rows_classified["updated"])
4147

4248
# Match new+updated records against previous version of pdp_contacts database, and
4349
# write these rows to the database.
44-
match_data.start(connection, rows_classified, manual_matches_df)
50+
match_data.start(connection, rows_classified, manual_matches_df, job_id)
4551

4652
# Copy raw input rows to json fields in pdp_contacts,
4753
# using a temporary table to simplify the update code.
@@ -60,3 +66,4 @@ def start_flow():
6066

6167
current_app.logger.info('Finished flow script run')
6268

69+
log_db.log_exec_status(job_id, 'flow', 'complete', '' )

src/server/pipeline/log_db.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,37 @@
1111

1212
metadata = MetaData()
1313

14-
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
14+
ex_stat = Table("execution_status", metadata, autoload=True, autoload_with=engine)
1515

16+
# Alembic version bfb1262d3195
1617

18+
# CREATE TABLE public.execution_status (
19+
# "_id" serial NOT NULL,
20+
# job_id int4 NOT NULL,
21+
# stage varchar(32) NOT NULL,
22+
# status varchar(32) NOT NULL,
23+
# details varchar(128) NOT NULL,
24+
# update_stamp timestamp NOT NULL DEFAULT now(),
25+
# CONSTRAINT execution_status_pkey null
26+
# );
1727

18-
def log_exec_status(job_id: str, job_status: dict):
1928

20-
# Write Last Execution stats to DB
21-
# See Alembic Revision ID: 05e0693f8cbb for table definition
29+
30+
def log_exec_status(job_id: str, exec_stage: str, exec_status: str, job_details: str):
31+
"""Log execution status (job_id, status, job_details) to DB """
32+
2233
with engine.connect() as connection:
23-
ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT
24-
keycol = 'job-' + job_id,
25-
valcol = json.dumps(job_status)
34+
ins_stmt = insert(ex_stat).values( # Postgres-specific insert() supporting ON CONFLICT
35+
job_id = job_id,
36+
stage = exec_stage,
37+
status = exec_status,
38+
details = json.dumps(job_details)
2639
)
2740

2841
# If key already present in DB, do update instead
2942
upsert = ins_stmt.on_conflict_do_update(
30-
constraint='kv_unique_keycol_key',
31-
set_=dict(valcol=json.dumps(job_status))
43+
constraint='uq_job_id',
44+
set_=dict( stage = exec_stage, status = exec_status, details = json.dumps(job_details))
3245
)
3346

3447
try:

src/server/pipeline/match_data.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ def normalize_before_match(value):
1515
return result
1616

1717

18-
def start(connection, added_or_updated_rows, manual_matches_df):
18+
def start(connection, added_or_updated_rows, manual_matches_df, job_id):
1919
# Match new records to each other and existing pdp_contacts data.
2020
# Assigns matching ID's to records, as well.
2121
# WARNING: not thread-safe and could lead to concurrency issues if two users /execute simultaneously
2222
current_app.logger.info('Start record matching')
2323
# Will need to consider updating the existing row contents (filter by active), deactivate,
2424
# try to match, and merge previous matching groups if applicable
25-
job_id = str(int(time.time()))
26-
log_db.log_exec_status(job_id, {'status': 'starting', 'at_row': 0, 'of_rows': 0})
25+
# job_id = str(int(time.time()))
26+
log_db.log_exec_status(job_id, 'matching', 'executing', '')
27+
2728
current_app.logger.info("***** Running execute job ID " + job_id + " *****")
2829
items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True)
2930
pdp_contacts = pd.read_sql_table('pdp_contacts', connection)
@@ -55,9 +56,7 @@ def start(connection, added_or_updated_rows, manual_matches_df):
5556
current_app.logger.info("- Matching rows {}-{} of {}".format(
5657
row_num + 1, min(len(rows), row_num + row_print_freq), len(rows))
5758
)
58-
log_db.log_exec_status(job_id, {
59-
'status': 'executing', 'at_row': row_num + 1, 'of_rows': len(rows)
60-
})
59+
log_db.log_exec_status(job_id, 'matching', 'executing', str({'at_row': row_num + 1, 'of_rows': len(rows) }) )
6160

6261
# Exact matches based on specified columns
6362
row_matches = pdp_contacts[
@@ -103,4 +102,4 @@ def start(connection, added_or_updated_rows, manual_matches_df):
103102
items_to_update.to_sql('pdp_contacts', connection, index=False, if_exists='append')
104103
current_app.logger.info("- Finished load to pdp_contacts table")
105104

106-
log_db.log_exec_status(job_id, {'status': 'complete', 'at_row': len(rows), 'of_rows': len(rows)})
105+
log_db.log_exec_status(job_id, 'matching', 'executing', str({'at_row': len(rows), 'of_rows': len(rows) }) )

0 commit comments

Comments
 (0)