Skip to content

Commit 7501c89

Browse files
committed
Merge branch 'table_for_file' of https://github.com/CodeForPhilly/paws-data-pipeline into execute-adjustments
� Conflicts: � src/server/pipeline/match_data.py
2 parents dd03fb9 + 9b3f914 commit 7501c89

File tree

5 files changed

+68
-9
lines changed

5 files changed

+68
-9
lines changed

src/server/api/admin_api.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,20 @@ def list_statistics():
131131
return last_execution_details
132132

133133

134+
@admin_api.route("/api/get_execution_status/<int:job_id>", methods=["GET"])
135+
def get_exec_status(job_id):
136+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
137+
with engine.connect() as connection:
138+
s_jobid = 'job-' + str(job_id)
139+
s = text("select valcol from kv_unique where keycol = :j ;")
140+
s = s.bindparams(j=s_jobid)
141+
result = connection.execute(s)
142+
exec_status = result.fetchone()[0]
143+
144+
return exec_status
145+
146+
147+
134148
"""
135149
@admin_api.route('/api/status', methods=['GET'])
136150
def checkStatus():

src/server/app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
app.secret_key = APP_SECRET_KEY
1717
app.config["MAX_CONTENT_LENGTH"] = 500 * 1024 * 1024 # 500 Megs
1818
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
19+
1920
from api.admin_api import admin_api
2021
from api.common_api import common_api
2122
from api.user_api import user_api
@@ -24,6 +25,8 @@
2425
app.register_blueprint(common_api)
2526
app.register_blueprint(user_api)
2627

28+
app.logger.setLevel('INFO')
29+
2730
# init_db_schema.start(connection)
2831

2932

src/server/config.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,3 @@
7676
os.makedirs(CURRENT_SOURCE_FILES_PATH, exist_ok=True)
7777
os.makedirs(REPORT_PATH, exist_ok=True)
7878
os.makedirs(ZIPPED_FILES, exist_ok=True)
79-
80-
if not (os.path.exists(LOGS_PATH + "last_execution.json")):
81-
f = open(
82-
LOGS_PATH + "last_execution.json", "w"
83-
) # Prevent 500 error from /api/statistics
84-
f.write("{}")
85-
f.close()

src/server/pipeline/log_db.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from datetime import datetime
2+
import json
3+
from sqlalchemy.sql import text
4+
from flask import current_app
5+
6+
from sqlalchemy.dialects.postgresql import insert
7+
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, exc, select
8+
9+
from config import engine
10+
11+
12+
metadata = MetaData()
13+
14+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
15+
16+
17+
18+
def log_exec_status(job_id: str, job_status: dict):
19+
20+
# Write Last Execution stats to DB
21+
# See Alembic Revision ID: 05e0693f8cbb for table definition
22+
with engine.connect() as connection:
23+
ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT
24+
keycol = 'job-' + job_id,
25+
valcol = json.dumps(job_status)
26+
)
27+
28+
# If key already present in DB, do update instead
29+
upsert = ins_stmt.on_conflict_do_update(
30+
constraint='kv_unique_keycol_key',
31+
set_=dict(valcol=json.dumps(job_status))
32+
)
33+
34+
try:
35+
connection.execute(upsert)
36+
except Exception as e:
37+
current_app.logger.error("Insert/Update failed Execution status")
38+
current_app.logger.exception(e)
39+
40+

src/server/pipeline/match_data.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import datetime
1+
import datetime, time
22
import pandas as pd
33
import numpy as np
44

55
from flask import current_app
6+
from pipeline import log_db
7+
68

79

810
def start(connection, added_or_updated_rows):
@@ -12,6 +14,10 @@ def start(connection, added_or_updated_rows):
1214
current_app.logger.info('Start record matching')
1315
# Will need to consider updating the existing row contents (filter by active), deactivate,
1416
# try to match, and merge previous matching groups if applicable
17+
18+
job_id = str(int(time.time()))
19+
log_db.log_exec_status(job_id,{'status': 'starting', 'at_row': 0, 'of_rows':0})
20+
current_app.logger.info("Running execute job ID " + job_id)
1521
items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True)
1622
pdp_contacts = pd.read_sql_table('pdp_contacts', connection)
1723

@@ -30,8 +36,9 @@ def start(connection, added_or_updated_rows):
3036
for row_num, row in enumerate(rows):
3137
if row_num % row_print_freq == 0:
3238
current_app.logger.info("- Matching rows {}-{} of {}".format(
33-
row_num + 1, min(len(rows), row_num + row_print_freq), len(rows))
39+
row_num+1, min(len(rows), row_num+row_print_freq), len(rows))
3440
)
41+
log_db.log_exec_status(job_id,{'status': 'executing', 'at_row': row_num+1, 'of_rows':len(rows)})
3542

3643
# Exact matches based on specified columns
3744
row_matches = pdp_contacts[
@@ -66,3 +73,5 @@ def start(connection, added_or_updated_rows):
6673
current_app.logger.info("- Writing data to pdp_contacts table")
6774
items_to_update.to_sql('pdp_contacts', connection, index=False, if_exists='append')
6875
current_app.logger.info("- Finished load to pdp_contacts table")
76+
77+
log_db.log_exec_status(job_id,{'status': 'complete', 'at_row': len(rows), 'of_rows':len(rows)})

0 commit comments

Comments
 (0)